ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.23
Committed: Sun May 27 18:35:57 2018 UTC (5 years, 11 months ago) by greg
Content type: text/plain
Branch: MAIN
CVS Tags: rad5R2, rad5R3
Changes since 2.22: +3 -1 lines
Log Message:
Added internal error check

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.23 static const char RCSid[] = "$Id: rc3.c,v 2.22 2016/08/18 00:52:48 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * Accumulate ray contributions for a set of materials
6     * Controlling process for multiple children
7     */
8    
9 greg 2.14 #include <signal.h>
10 greg 2.1 #include "rcontrib.h"
11     #include "selcall.h"
12    
13 greg 2.10 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
14    
15 greg 2.1 /* Modifier contribution queue (results waiting to be output) */
16     typedef struct s_binq {
17 greg 2.6 RNUMBER ndx; /* index for this entry */
18     RNUMBER nadded; /* accumulated so far */
19 greg 2.1 struct s_binq *next; /* next in queue */
20     MODCONT *mca[1]; /* contrib. array (extends struct) */
21     } BINQ;
22    
23     static BINQ *out_bq = NULL; /* output bin queue */
24     static BINQ *free_bq = NULL; /* free queue entries */
25    
26 greg 2.21 static SUBPROC kidpr[MAXPROCESS]; /* our child processes */
27    
28 greg 2.6 static struct {
29     RNUMBER r1; /* assigned ray starting index */
30     FILE *infp; /* file pointer to read from process */
31 greg 2.10 int nr; /* number of rays to sum (0 if free) */
32 greg 2.21 } kida[MAXPROCESS]; /* our child process i/o */
33 greg 2.1
34    
35 greg 2.4 /* Get new bin queue entry */
36 greg 2.1 static BINQ *
37     new_binq()
38     {
39 greg 2.4 BINQ *bp;
40 greg 2.1 int i;
41    
42 greg 2.4 if (free_bq != NULL) { /* something already available? */
43     bp = free_bq;
44 greg 2.1 free_bq = bp->next;
45     bp->next = NULL;
46 greg 2.6 bp->nadded = 0;
47 greg 2.1 return(bp);
48     }
49     /* else allocate fresh */
50 greg 2.4 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
51 greg 2.1 if (bp == NULL)
52     goto memerr;
53     for (i = nmods; i--; ) {
54     MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
55     bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
56     sizeof(DCOLOR)*(mp->nbins-1));
57     if (bp->mca[i] == NULL)
58     goto memerr;
59     memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
60     /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
61     }
62     bp->ndx = 0;
63 greg 2.6 bp->nadded = 0;
64 greg 2.1 bp->next = NULL;
65     return(bp);
66     memerr:
67     error(SYSTEM, "out of memory in new_binq()");
68     return(NULL);
69     }
70    
71    
72     /* Free a bin queue entry */
73     static void
74     free_binq(BINQ *bp)
75     {
76     int i;
77    
78     if (bp == NULL) { /* signal to release our free list */
79     while ((bp = free_bq) != NULL) {
80     free_bq = bp->next;
81     for (i = nmods; i--; )
82     free(bp->mca[i]);
83     /* Note: we don't own bp->mca[i]->binv */
84     free(bp);
85     }
86     return;
87     }
88     /* clear sums for next use */
89     /* for (i = nmods; i--; )
90     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
91     */
92 greg 2.23 if (bp->next != NULL)
93     error(CONSISTENCY, "free_binq() handed list");
94 greg 2.1 bp->ndx = 0;
95     bp->next = free_bq; /* push onto free list */
96     free_bq = bp;
97     }
98    
99    
100     /* Add modifier values to accumulation record in queue and clear */
101 greg 2.10 static void
102 greg 2.1 queue_modifiers()
103     {
104     MODCONT *mpin, *mpout;
105     int i, j;
106    
107     if ((accumulate > 0) | (out_bq == NULL))
108     error(CONSISTENCY, "bad call to queue_modifiers()");
109    
110     for (i = nmods; i--; ) {
111     mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
112     mpout = out_bq->mca[i];
113     for (j = mpout->nbins; j--; )
114     addcolor(mpout->cbin[j], mpin->cbin[j]);
115     memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
116     }
117 greg 2.5 out_bq->nadded++;
118 greg 2.1 }
119    
120    
121 greg 2.4 /* Sum one modifier record into another (updates nadded) */
122 greg 2.1 static void
123     add_modbin(BINQ *dst, BINQ *src)
124     {
125     int i, j;
126    
127     for (i = nmods; i--; ) {
128     MODCONT *mpin = src->mca[i];
129     MODCONT *mpout = dst->mca[i];
130     for (j = mpout->nbins; j--; )
131     addcolor(mpout->cbin[j], mpin->cbin[j]);
132     }
133 greg 2.4 dst->nadded += src->nadded;
134 greg 2.1 }
135    
136    
137 greg 2.2 /* Queue values for later output */
138     static void
139 greg 2.1 queue_output(BINQ *bp)
140     {
141     BINQ *b_last, *b_cur;
142    
143     if (accumulate <= 0) { /* just accumulating? */
144     if (out_bq == NULL) {
145     bp->next = NULL;
146     out_bq = bp;
147     } else {
148     add_modbin(out_bq, bp);
149     free_binq(bp);
150     }
151 greg 2.2 return;
152 greg 2.1 }
153 greg 2.10 b_last = NULL; /* insert in output queue */
154 greg 2.1 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
155     b_cur = b_cur->next)
156     b_last = b_cur;
157 greg 2.4
158 greg 2.1 if (b_last != NULL) {
159     bp->next = b_cur;
160     b_last->next = bp;
161     } else {
162     bp->next = out_bq;
163     out_bq = bp;
164     }
165 greg 2.3 if (accumulate == 1) /* no accumulation? */
166 greg 2.2 return;
167     b_cur = out_bq; /* else merge accumulation entries */
168     while (b_cur->next != NULL) {
169 greg 2.4 if (b_cur->nadded >= accumulate ||
170 greg 2.2 (b_cur->ndx-1)/accumulate !=
171     (b_cur->next->ndx-1)/accumulate) {
172     b_cur = b_cur->next;
173     continue;
174 greg 2.1 }
175 greg 2.2 add_modbin(b_cur, b_cur->next);
176     b_last = b_cur->next;
177     b_cur->next = b_last->next;
178     b_last->next = NULL;
179     free_binq(b_last);
180 greg 2.1 }
181 greg 2.2 }
182    
183    
184 greg 2.4 /* Count number of records ready for output */
185     static int
186     queue_ready()
187     {
188     int nready = 0;
189     BINQ *bp;
190    
191     for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
192     bp->ndx == lastdone+nready*accumulate+1;
193     bp = bp->next)
194     ++nready;
195    
196     return(nready);
197     }
198    
199    
200     /* Catch up with output queue by producing ready results */
201 greg 2.2 static int
202 greg 2.4 output_catchup(int nmax)
203 greg 2.2 {
204     int nout = 0;
205     BINQ *bp;
206     int i;
207 greg 2.10 /* output ready results */
208 greg 2.4 while (out_bq != NULL && out_bq->nadded >= accumulate
209     && out_bq->ndx == lastdone+1) {
210     if ((nmax > 0) & (nout >= nmax))
211     break;
212 greg 2.2 bp = out_bq; /* pop off first entry */
213     out_bq = bp->next;
214     bp->next = NULL;
215 greg 2.1 for (i = 0; i < nmods; i++) /* output record */
216 greg 2.2 mod_output(bp->mca[i]);
217 greg 2.1 end_record();
218 greg 2.2 free_binq(bp); /* free this entry */
219 greg 2.1 lastdone += accumulate;
220     ++nout;
221     }
222     return(nout);
223     }
224    
225    
226 greg 2.3 /* Put a zero record in results queue & output */
227 greg 2.1 void
228 greg 2.3 put_zero_record(int ndx)
229 greg 2.1 {
230     BINQ *bp = new_binq();
231     int i;
232    
233     for (i = nmods; i--; )
234     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
235     bp->ndx = ndx;
236 greg 2.6 bp->nadded = 1;
237 greg 2.1 queue_output(bp);
238 greg 2.4 output_catchup(0);
239 greg 2.1 }
240    
241    
242 greg 2.6 /* Get results from child process and add to queue */
243     static void
244     queue_results(int k)
245     {
246     BINQ *bq = new_binq(); /* get results holder */
247     int j;
248    
249     bq->ndx = kida[k].r1;
250     bq->nadded = kida[k].nr;
251     /* read from child */
252     for (j = 0; j < nmods; j++)
253 greg 2.22 if (getbinary(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
254 greg 2.6 kida[k].infp) != bq->mca[j]->nbins)
255     error(SYSTEM, "read error from render process");
256    
257     queue_output(bq); /* put results in output queue */
258     kida[k].nr = 0; /* mark child as available */
259     }
260    
261    
262 greg 2.1 /* callback to set output spec to NULL (stdout) */
263     static int
264     set_stdout(const LUENT *le, void *p)
265     {
266     (*(MODCONT *)le->data).outspec = NULL;
267     return(0);
268     }
269    
270    
271 greg 2.20 /* Start child processes if we can (call only once in parent!) */
272 greg 2.1 int
273     in_rchild()
274     {
275 greg 2.9 int rval;
276    
277     while (nchild < nproc) { /* fork until target reached */
278 greg 2.3 errno = 0;
279 greg 2.21 rval = open_process(&kidpr[nchild], NULL);
280 greg 2.9 if (rval < 0)
281     error(SYSTEM, "open_process() call failed");
282     if (rval == 0) { /* if in child, set up & return true */
283 greg 2.18 lu_doall(&modconttab, &set_stdout, NULL);
284 greg 2.4 lu_done(&ofiletab);
285 greg 2.5 while (nchild--) { /* don't share other pipes */
286 greg 2.21 close(kidpr[nchild].w);
287 greg 2.6 fclose(kida[nchild].infp);
288 greg 2.4 }
289 greg 2.1 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
290     outfmt = 'd';
291     header = 0;
292     yres = 0;
293     raysleft = 0;
294 greg 2.6 if (accumulate == 1) {
295     waitflush = xres = 1;
296     account = accumulate = 1;
297     } else { /* parent controls accumulation */
298     waitflush = xres = 0;
299     account = accumulate = 0;
300     }
301 greg 2.9 return(1); /* return "true" in child */
302 greg 2.1 }
303 greg 2.9 if (rval != PIPE_BUF)
304     error(CONSISTENCY, "bad value from open_process()");
305     /* connect to child's output */
306 greg 2.21 kida[nchild].infp = fdopen(kidpr[nchild].r, "rb");
307 greg 2.6 if (kida[nchild].infp == NULL)
308 greg 2.2 error(SYSTEM, "out of memory in in_rchild()");
309 greg 2.6 kida[nchild++].nr = 0; /* mark as available */
310 greg 2.1 }
311 greg 2.20 #ifdef getc_unlocked
312     for (rval = nchild; rval--; ) /* avoid mutex overhead */
313     flockfile(kida[rval].infp);
314     #endif
315 greg 2.9 return(0); /* return "false" in parent */
316 greg 2.1 }
317    
318    
319     /* Close child processes */
320     void
321 greg 2.12 end_children(int immed)
322 greg 2.1 {
323 greg 2.21 int i;
324    
325     #ifdef SIGKILL /* error mode -- quick exit */
326     for (i = nchild*immed; i-- > 0; )
327     kill(kidpr[nchild].pid, SIGKILL);
328 greg 2.15 #endif
329 greg 2.21 if ((i = close_processes(kidpr, nchild)) > 0 && !immed) {
330     sprintf(errmsg, "rendering process returned bad status (%d)",
331     i);
332 greg 2.1 error(WARNING, errmsg);
333 greg 2.21 }
334     while (nchild-- > 0)
335 greg 2.9 fclose(kida[nchild].infp);
336 greg 2.1 }
337    
338    
339 greg 2.5 /* Wait for the next available child, managing output queue simultaneously */
340 greg 2.1 static int
341 greg 2.5 next_child_nq(int flushing)
342 greg 2.1 {
343 greg 2.2 static struct timeval polling;
344 greg 2.4 struct timeval *pmode;
345 greg 2.2 fd_set readset, errset;
346 greg 2.6 int i, n, nr, nqr;
347 greg 2.1
348 greg 2.5 if (!flushing) /* see if there's one free */
349 greg 2.1 for (i = nchild; i--; )
350 greg 2.6 if (!kida[i].nr)
351 greg 2.1 return(i);
352 greg 2.4
353 greg 2.5 nqr = queue_ready(); /* choose blocking mode or polling */
354     if ((nqr > 0) & !flushing)
355     pmode = &polling;
356     else
357 greg 2.4 pmode = NULL;
358 greg 2.5 tryagain: /* catch up with output? */
359     if (pmode == &polling) {
360     if (nqr > nchild) /* don't get too far behind */
361     nqr -= output_catchup(nqr-nchild);
362     } else if (nqr > 0) /* clear output before blocking */
363     nqr -= output_catchup(0);
364 greg 2.1 /* prepare select() call */
365     FD_ZERO(&readset); FD_ZERO(&errset);
366     n = nr = 0;
367     for (i = nchild; i--; ) {
368 greg 2.6 if (kida[i].nr) {
369 greg 2.21 FD_SET(kidpr[i].r, &readset);
370 greg 2.1 ++nr;
371     }
372 greg 2.21 FD_SET(kidpr[i].r, &errset);
373     if (kidpr[i].r >= n)
374     n = kidpr[i].r + 1;
375 greg 2.1 }
376 greg 2.3 if (!nr) /* nothing to wait for? */
377     return(-1);
378 greg 2.2 if ((nr > 1) | (pmode == &polling)) {
379 greg 2.1 errno = 0;
380 greg 2.5 nr = select(n, &readset, NULL, &errset, pmode);
381     if (!nr) {
382 greg 2.2 pmode = NULL; /* try again, blocking this time */
383     goto tryagain;
384     }
385 greg 2.5 if (nr < 0)
386 greg 2.3 error(SYSTEM, "select() error in next_child_nq()");
387 greg 2.1 } else
388     FD_ZERO(&errset);
389     n = -1; /* read results from child(ren) */
390     for (i = nchild; i--; ) {
391 greg 2.21 if (FD_ISSET(kidpr[i].r, &errset))
392 greg 2.1 error(USER, "rendering process died");
393 greg 2.21 if (FD_ISSET(kidpr[i].r, &readset))
394 greg 2.6 queue_results(n = i);
395 greg 2.1 }
396 greg 2.3 return(n); /* first available child */
397 greg 2.1 }
398    
399    
400     /* Run parental oversight loop */
401     void
402     parental_loop()
403     {
404 greg 2.17 const int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
405 greg 2.6 int ninq = 0;
406     FVECT orgdir[2*MAXIQ];
407     int i, n;
408 greg 2.1 /* load rays from stdin & process */
409     #ifdef getc_unlocked
410     flockfile(stdin); /* avoid lock/unlock overhead */
411     #endif
412 greg 2.6 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
413 greg 2.18 const int zero_ray = orgdir[2*ninq+1][0] == 0.0 &&
414     (orgdir[2*ninq+1][1] == 0.0) &
415     (orgdir[2*ninq+1][2] == 0.0);
416     ninq += !zero_ray;
417     /* Zero ray cannot go in input queue */
418     if (zero_ray ? ninq : ninq >= qlimit ||
419 greg 2.6 lastray/accumulate != (lastray+ninq)/accumulate) {
420 greg 2.5 i = next_child_nq(0); /* manages output */
421 greg 2.6 n = ninq;
422 greg 2.18 if (accumulate > 1) /* need terminator? */
423 greg 2.6 memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
424     n *= sizeof(FVECT)*2; /* send assignment */
425 greg 2.21 if (writebuf(kidpr[i].w, (char *)orgdir, n) != n)
426 greg 2.1 error(SYSTEM, "pipe write error");
427 greg 2.6 kida[i].r1 = lastray+1;
428     lastray += kida[i].nr = ninq; /* mark as busy */
429     if (lastray < lastdone) { /* RNUMBER wrapped? */
430     while (next_child_nq(1) >= 0)
431     ;
432 greg 2.17 lastray -= ninq;
433     lastdone = lastray %= accumulate;
434 greg 2.6 }
435 greg 2.17 ninq = 0;
436 greg 2.1 }
437 greg 2.18 if (zero_ray) { /* put bogus record? */
438     if ((yres <= 0) | (xres <= 1) &&
439     (lastray+1) % accumulate == 0) {
440     while (next_child_nq(1) >= 0)
441     ; /* clear the queue */
442     lastdone = lastray = accumulate-1;
443     waitflush = 1; /* flush next */
444     }
445     put_zero_record(++lastray);
446     }
447 greg 2.1 if (raysleft && !--raysleft)
448     break; /* preemptive EOI */
449     }
450     while (next_child_nq(1) >= 0) /* empty results queue */
451     ;
452 greg 2.10 if (account < accumulate) {
453     error(WARNING, "partial accumulation in final record");
454     free_binq(out_bq); /* XXX just ignore it */
455 greg 2.3 out_bq = NULL;
456 greg 2.1 }
457 greg 2.10 free_binq(NULL); /* clean up */
458     lu_done(&ofiletab);
459 greg 2.1 if (raysleft)
460     error(USER, "unexpected EOF on input");
461 greg 2.10 }
462    
463    
464     /* Wait for the next available child by monitoring "to" pipes */
465     static int
466     next_child_ready()
467     {
468     fd_set writeset, errset;
469 greg 2.11 int i, n;
470 greg 2.10
471     for (i = nchild; i--; ) /* see if there's one free first */
472     if (!kida[i].nr)
473     return(i);
474     /* prepare select() call */
475     FD_ZERO(&writeset); FD_ZERO(&errset);
476     n = 0;
477     for (i = nchild; i--; ) {
478 greg 2.21 FD_SET(kidpr[i].w, &writeset);
479     FD_SET(kidpr[i].r, &errset);
480     if (kidpr[i].w >= n)
481     n = kidpr[i].w + 1;
482     if (kidpr[i].r >= n)
483     n = kidpr[i].r + 1;
484 greg 2.10 }
485     errno = 0;
486     n = select(n, NULL, &writeset, &errset, NULL);
487     if (n < 0)
488     error(SYSTEM, "select() error in next_child_ready()");
489     n = -1; /* identify waiting child */
490     for (i = nchild; i--; ) {
491 greg 2.21 if (FD_ISSET(kidpr[i].r, &errset))
492 greg 2.10 error(USER, "rendering process died");
493 greg 2.21 if (FD_ISSET(kidpr[i].w, &writeset))
494 greg 2.10 kida[n = i].nr = 0;
495     }
496     return(n); /* first available child */
497     }
498    
499    
500     /* Modified parental loop for full accumulation mode (-c 0) */
501     void
502     feeder_loop()
503     {
504     static int ignore_warning_given = 0;
505     int ninq = 0;
506     FVECT orgdir[2*MAXIQ];
507     int i, n;
508     /* load rays from stdin & process */
509     #ifdef getc_unlocked
510     flockfile(stdin); /* avoid lock/unlock overhead */
511     #endif
512     while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
513     if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
514     (orgdir[2*ninq+1][1] == 0.0) &
515     (orgdir[2*ninq+1][2] == 0.0)) {
516     if (!ignore_warning_given++)
517     error(WARNING,
518     "dummy ray(s) ignored during accumulation\n");
519     continue;
520     }
521     if (++ninq >= MAXIQ) {
522     i = next_child_ready(); /* get eager child */
523     n = sizeof(FVECT)*2 * ninq; /* give assignment */
524 greg 2.21 if (writebuf(kidpr[i].w, (char *)orgdir, n) != n)
525 greg 2.10 error(SYSTEM, "pipe write error");
526     kida[i].r1 = lastray+1;
527     lastray += kida[i].nr = ninq;
528     if (lastray < lastdone) /* RNUMBER wrapped? */
529     lastdone = lastray = 0;
530 greg 2.18 ninq = 0;
531 greg 2.10 }
532     if (raysleft && !--raysleft)
533     break; /* preemptive EOI */
534     }
535     if (ninq) { /* polish off input */
536     i = next_child_ready();
537     n = sizeof(FVECT)*2 * ninq;
538 greg 2.21 if (writebuf(kidpr[i].w, (char *)orgdir, n) != n)
539 greg 2.10 error(SYSTEM, "pipe write error");
540     kida[i].r1 = lastray+1;
541     lastray += kida[i].nr = ninq;
542     ninq = 0;
543     }
544 greg 2.13 memset(orgdir, 0, sizeof(FVECT)*2); /* get results */
545     for (i = nchild; i--; ) {
546 greg 2.21 writebuf(kidpr[i].w, (char *)orgdir, sizeof(FVECT)*2);
547 greg 2.10 queue_results(i);
548     }
549     if (recover) /* and from before? */
550     queue_modifiers();
551 greg 2.12 end_children(0); /* free up file descriptors */
552 greg 2.10 for (i = 0; i < nmods; i++)
553     mod_output(out_bq->mca[i]); /* output accumulated record */
554     end_record();
555     free_binq(out_bq); /* clean up */
556     out_bq = NULL;
557     free_binq(NULL);
558 greg 2.1 lu_done(&ofiletab);
559 greg 2.10 if (raysleft)
560     error(USER, "unexpected EOF on input");
561 greg 2.1 }