ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.14
Committed: Wed Jun 20 21:36:34 2012 UTC (11 years, 10 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.13: +2 -1 lines
Log Message:
Added missing include file

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: rc3.c,v 2.13 2012/06/19 01:27:13 greg Exp $";
3 #endif
4 /*
5 * Accumulate ray contributions for a set of materials
6 * Controlling process for multiple children
7 */
8
9 #include <signal.h>
10 #include "rcontrib.h"
11 #include "platform.h"
12 #include "rtprocess.h"
13 #include "selcall.h"
14
15 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
16
17 /* Modifier contribution queue (results waiting to be output) */
18 typedef struct s_binq {
19 RNUMBER ndx; /* index for this entry */
20 RNUMBER nadded; /* accumulated so far */
21 struct s_binq *next; /* next in queue */
22 MODCONT *mca[1]; /* contrib. array (extends struct) */
23 } BINQ;
24
25 static BINQ *out_bq = NULL; /* output bin queue */
26 static BINQ *free_bq = NULL; /* free queue entries */
27
28 static struct {
29 RNUMBER r1; /* assigned ray starting index */
30 SUBPROC pr; /* PID, i/o descriptors */
31 FILE *infp; /* file pointer to read from process */
32 int nr; /* number of rays to sum (0 if free) */
33 } kida[MAXPROCESS]; /* our child processes */
34
35
36 /* Get new bin queue entry */
37 static BINQ *
38 new_binq()
39 {
40 BINQ *bp;
41 int i;
42
43 if (free_bq != NULL) { /* something already available? */
44 bp = free_bq;
45 free_bq = bp->next;
46 bp->next = NULL;
47 bp->nadded = 0;
48 return(bp);
49 }
50 /* else allocate fresh */
51 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
52 if (bp == NULL)
53 goto memerr;
54 for (i = nmods; i--; ) {
55 MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
56 bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
57 sizeof(DCOLOR)*(mp->nbins-1));
58 if (bp->mca[i] == NULL)
59 goto memerr;
60 memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
61 /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
62 }
63 bp->ndx = 0;
64 bp->nadded = 0;
65 bp->next = NULL;
66 return(bp);
67 memerr:
68 error(SYSTEM, "out of memory in new_binq()");
69 return(NULL);
70 }
71
72
73 /* Free a bin queue entry */
74 static void
75 free_binq(BINQ *bp)
76 {
77 int i;
78
79 if (bp == NULL) { /* signal to release our free list */
80 while ((bp = free_bq) != NULL) {
81 free_bq = bp->next;
82 for (i = nmods; i--; )
83 free(bp->mca[i]);
84 /* Note: we don't own bp->mca[i]->binv */
85 free(bp);
86 }
87 return;
88 }
89 /* clear sums for next use */
90 /* for (i = nmods; i--; )
91 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
92 */
93 bp->ndx = 0;
94 bp->next = free_bq; /* push onto free list */
95 free_bq = bp;
96 }
97
98
99 /* Add modifier values to accumulation record in queue and clear */
100 static void
101 queue_modifiers()
102 {
103 MODCONT *mpin, *mpout;
104 int i, j;
105
106 if ((accumulate > 0) | (out_bq == NULL))
107 error(CONSISTENCY, "bad call to queue_modifiers()");
108
109 for (i = nmods; i--; ) {
110 mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
111 mpout = out_bq->mca[i];
112 for (j = mpout->nbins; j--; )
113 addcolor(mpout->cbin[j], mpin->cbin[j]);
114 memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
115 }
116 out_bq->nadded++;
117 }
118
119
120 /* Sum one modifier record into another (updates nadded) */
121 static void
122 add_modbin(BINQ *dst, BINQ *src)
123 {
124 int i, j;
125
126 for (i = nmods; i--; ) {
127 MODCONT *mpin = src->mca[i];
128 MODCONT *mpout = dst->mca[i];
129 for (j = mpout->nbins; j--; )
130 addcolor(mpout->cbin[j], mpin->cbin[j]);
131 }
132 dst->nadded += src->nadded;
133 }
134
135
136 /* Queue values for later output */
137 static void
138 queue_output(BINQ *bp)
139 {
140 BINQ *b_last, *b_cur;
141
142 if (accumulate <= 0) { /* just accumulating? */
143 if (out_bq == NULL) {
144 bp->next = NULL;
145 out_bq = bp;
146 } else {
147 add_modbin(out_bq, bp);
148 free_binq(bp);
149 }
150 return;
151 }
152 b_last = NULL; /* insert in output queue */
153 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
154 b_cur = b_cur->next)
155 b_last = b_cur;
156
157 if (b_last != NULL) {
158 bp->next = b_cur;
159 b_last->next = bp;
160 } else {
161 bp->next = out_bq;
162 out_bq = bp;
163 }
164 if (accumulate == 1) /* no accumulation? */
165 return;
166 b_cur = out_bq; /* else merge accumulation entries */
167 while (b_cur->next != NULL) {
168 if (b_cur->nadded >= accumulate ||
169 (b_cur->ndx-1)/accumulate !=
170 (b_cur->next->ndx-1)/accumulate) {
171 b_cur = b_cur->next;
172 continue;
173 }
174 add_modbin(b_cur, b_cur->next);
175 b_last = b_cur->next;
176 b_cur->next = b_last->next;
177 b_last->next = NULL;
178 free_binq(b_last);
179 }
180 }
181
182
183 /* Count number of records ready for output */
184 static int
185 queue_ready()
186 {
187 int nready = 0;
188 BINQ *bp;
189
190 for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
191 bp->ndx == lastdone+nready*accumulate+1;
192 bp = bp->next)
193 ++nready;
194
195 return(nready);
196 }
197
198
199 /* Catch up with output queue by producing ready results */
200 static int
201 output_catchup(int nmax)
202 {
203 int nout = 0;
204 BINQ *bp;
205 int i;
206 /* output ready results */
207 while (out_bq != NULL && out_bq->nadded >= accumulate
208 && out_bq->ndx == lastdone+1) {
209 if ((nmax > 0) & (nout >= nmax))
210 break;
211 bp = out_bq; /* pop off first entry */
212 out_bq = bp->next;
213 bp->next = NULL;
214 for (i = 0; i < nmods; i++) /* output record */
215 mod_output(bp->mca[i]);
216 end_record();
217 free_binq(bp); /* free this entry */
218 lastdone += accumulate;
219 ++nout;
220 }
221 return(nout);
222 }
223
224
225 /* Put a zero record in results queue & output */
226 void
227 put_zero_record(int ndx)
228 {
229 BINQ *bp = new_binq();
230 int i;
231
232 for (i = nmods; i--; )
233 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
234 bp->ndx = ndx;
235 bp->nadded = 1;
236 queue_output(bp);
237 output_catchup(0);
238 }
239
240
241 /* Get results from child process and add to queue */
242 static void
243 queue_results(int k)
244 {
245 BINQ *bq = new_binq(); /* get results holder */
246 int j;
247
248 bq->ndx = kida[k].r1;
249 bq->nadded = kida[k].nr;
250 /* read from child */
251 for (j = 0; j < nmods; j++)
252 if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
253 kida[k].infp) != bq->mca[j]->nbins)
254 error(SYSTEM, "read error from render process");
255
256 queue_output(bq); /* put results in output queue */
257 kida[k].nr = 0; /* mark child as available */
258 }
259
260
261 /* callback to set output spec to NULL (stdout) */
262 static int
263 set_stdout(const LUENT *le, void *p)
264 {
265 (*(MODCONT *)le->data).outspec = NULL;
266 return(0);
267 }
268
269
270 /* Start child processes if we can */
271 int
272 in_rchild()
273 {
274 int rval;
275
276 while (nchild < nproc) { /* fork until target reached */
277 errno = 0;
278 rval = open_process(&kida[nchild].pr, NULL);
279 if (rval < 0)
280 error(SYSTEM, "open_process() call failed");
281 if (rval == 0) { /* if in child, set up & return true */
282 lu_doall(&modconttab, set_stdout, NULL);
283 lu_done(&ofiletab);
284 while (nchild--) { /* don't share other pipes */
285 close(kida[nchild].pr.w);
286 fclose(kida[nchild].infp);
287 }
288 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
289 outfmt = 'd';
290 header = 0;
291 yres = 0;
292 raysleft = 0;
293 if (accumulate == 1) {
294 waitflush = xres = 1;
295 account = accumulate = 1;
296 } else { /* parent controls accumulation */
297 waitflush = xres = 0;
298 account = accumulate = 0;
299 }
300 return(1); /* return "true" in child */
301 }
302 if (rval != PIPE_BUF)
303 error(CONSISTENCY, "bad value from open_process()");
304 /* connect to child's output */
305 kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
306 if (kida[nchild].infp == NULL)
307 error(SYSTEM, "out of memory in in_rchild()");
308 #ifdef getc_unlocked
309 flockfile(kida[nchild].infp); /* avoid mutex overhead */
310 #endif
311 kida[nchild++].nr = 0; /* mark as available */
312 }
313 return(0); /* return "false" in parent */
314 }
315
316
317 /* Close child processes */
318 void
319 end_children(int immed)
320 {
321 int status;
322
323 while (nchild > 0) {
324 nchild--;
325 if (immed) /* error mode -- quick exit */
326 kill(kida[nchild].pr.pid, SIGKILL);
327 if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) {
328 sprintf(errmsg,
329 "rendering process returned bad status (%d)",
330 status);
331 error(WARNING, errmsg);
332 }
333 fclose(kida[nchild].infp);
334 }
335 }
336
337
338 /* Wait for the next available child, managing output queue simultaneously */
339 static int
340 next_child_nq(int flushing)
341 {
342 static struct timeval polling;
343 struct timeval *pmode;
344 fd_set readset, errset;
345 int i, n, nr, nqr;
346
347 if (!flushing) /* see if there's one free */
348 for (i = nchild; i--; )
349 if (!kida[i].nr)
350 return(i);
351
352 nqr = queue_ready(); /* choose blocking mode or polling */
353 if ((nqr > 0) & !flushing)
354 pmode = &polling;
355 else
356 pmode = NULL;
357 tryagain: /* catch up with output? */
358 if (pmode == &polling) {
359 if (nqr > nchild) /* don't get too far behind */
360 nqr -= output_catchup(nqr-nchild);
361 } else if (nqr > 0) /* clear output before blocking */
362 nqr -= output_catchup(0);
363 /* prepare select() call */
364 FD_ZERO(&readset); FD_ZERO(&errset);
365 n = nr = 0;
366 for (i = nchild; i--; ) {
367 if (kida[i].nr) {
368 FD_SET(kida[i].pr.r, &readset);
369 ++nr;
370 }
371 FD_SET(kida[i].pr.r, &errset);
372 if (kida[i].pr.r >= n)
373 n = kida[i].pr.r + 1;
374 }
375 if (!nr) /* nothing to wait for? */
376 return(-1);
377 if ((nr > 1) | (pmode == &polling)) {
378 errno = 0;
379 nr = select(n, &readset, NULL, &errset, pmode);
380 if (!nr) {
381 pmode = NULL; /* try again, blocking this time */
382 goto tryagain;
383 }
384 if (nr < 0)
385 error(SYSTEM, "select() error in next_child_nq()");
386 } else
387 FD_ZERO(&errset);
388 n = -1; /* read results from child(ren) */
389 for (i = nchild; i--; ) {
390 if (FD_ISSET(kida[i].pr.r, &errset))
391 error(USER, "rendering process died");
392 if (FD_ISSET(kida[i].pr.r, &readset))
393 queue_results(n = i);
394 }
395 return(n); /* first available child */
396 }
397
398
399 /* Run parental oversight loop */
400 void
401 parental_loop()
402 {
403 static int ignore_warning_given = 0;
404 int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
405 int ninq = 0;
406 FVECT orgdir[2*MAXIQ];
407 int i, n;
408 /* load rays from stdin & process */
409 #ifdef getc_unlocked
410 flockfile(stdin); /* avoid lock/unlock overhead */
411 #endif
412 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
413 if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
414 (orgdir[2*ninq+1][1] == 0.0) &
415 (orgdir[2*ninq+1][2] == 0.0)) {
416 if (accumulate != 1) {
417 if (!ignore_warning_given++)
418 error(WARNING,
419 "dummy ray(s) ignored during accumulation\n");
420 continue;
421 }
422 while (next_child_nq(1) >= 0)
423 ; /* clear the queue */
424 lastdone = lastray = 0;
425 if ((yres <= 0) | (xres <= 0))
426 waitflush = 1; /* flush next */
427 put_zero_record(++lastray);
428 } else if (++ninq >= qlimit ||
429 lastray/accumulate != (lastray+ninq)/accumulate) {
430 i = next_child_nq(0); /* manages output */
431 n = ninq;
432 if (accumulate != 1) /* request flush? */
433 memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
434 n *= sizeof(FVECT)*2; /* send assignment */
435 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
436 error(SYSTEM, "pipe write error");
437 kida[i].r1 = lastray+1;
438 lastray += kida[i].nr = ninq; /* mark as busy */
439 ninq = 0;
440 if (lastray < lastdone) { /* RNUMBER wrapped? */
441 while (next_child_nq(1) >= 0)
442 ;
443 lastdone = lastray = 0;
444 }
445 }
446 if (raysleft && !--raysleft)
447 break; /* preemptive EOI */
448 }
449 while (next_child_nq(1) >= 0) /* empty results queue */
450 ;
451 if (account < accumulate) {
452 error(WARNING, "partial accumulation in final record");
453 free_binq(out_bq); /* XXX just ignore it */
454 out_bq = NULL;
455 }
456 free_binq(NULL); /* clean up */
457 lu_done(&ofiletab);
458 if (raysleft)
459 error(USER, "unexpected EOF on input");
460 }
461
462
463 /* Wait for the next available child by monitoring "to" pipes */
464 static int
465 next_child_ready()
466 {
467 fd_set writeset, errset;
468 int i, n;
469
470 for (i = nchild; i--; ) /* see if there's one free first */
471 if (!kida[i].nr)
472 return(i);
473 /* prepare select() call */
474 FD_ZERO(&writeset); FD_ZERO(&errset);
475 n = 0;
476 for (i = nchild; i--; ) {
477 FD_SET(kida[i].pr.w, &writeset);
478 FD_SET(kida[i].pr.r, &errset);
479 if (kida[i].pr.w >= n)
480 n = kida[i].pr.w + 1;
481 if (kida[i].pr.r >= n)
482 n = kida[i].pr.r + 1;
483 }
484 errno = 0;
485 n = select(n, NULL, &writeset, &errset, NULL);
486 if (n < 0)
487 error(SYSTEM, "select() error in next_child_ready()");
488 n = -1; /* identify waiting child */
489 for (i = nchild; i--; ) {
490 if (FD_ISSET(kida[i].pr.r, &errset))
491 error(USER, "rendering process died");
492 if (FD_ISSET(kida[i].pr.w, &writeset))
493 kida[n = i].nr = 0;
494 }
495 return(n); /* first available child */
496 }
497
498
499 /* Modified parental loop for full accumulation mode (-c 0) */
500 void
501 feeder_loop()
502 {
503 static int ignore_warning_given = 0;
504 int ninq = 0;
505 FVECT orgdir[2*MAXIQ];
506 int i, n;
507 /* load rays from stdin & process */
508 #ifdef getc_unlocked
509 flockfile(stdin); /* avoid lock/unlock overhead */
510 #endif
511 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
512 if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
513 (orgdir[2*ninq+1][1] == 0.0) &
514 (orgdir[2*ninq+1][2] == 0.0)) {
515 if (!ignore_warning_given++)
516 error(WARNING,
517 "dummy ray(s) ignored during accumulation\n");
518 continue;
519 }
520 if (++ninq >= MAXIQ) {
521 i = next_child_ready(); /* get eager child */
522 n = sizeof(FVECT)*2 * ninq; /* give assignment */
523 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
524 error(SYSTEM, "pipe write error");
525 kida[i].r1 = lastray+1;
526 lastray += kida[i].nr = ninq;
527 ninq = 0;
528 if (lastray < lastdone) /* RNUMBER wrapped? */
529 lastdone = lastray = 0;
530 }
531 if (raysleft && !--raysleft)
532 break; /* preemptive EOI */
533 }
534 if (ninq) { /* polish off input */
535 i = next_child_ready();
536 n = sizeof(FVECT)*2 * ninq;
537 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
538 error(SYSTEM, "pipe write error");
539 kida[i].r1 = lastray+1;
540 lastray += kida[i].nr = ninq;
541 ninq = 0;
542 }
543 memset(orgdir, 0, sizeof(FVECT)*2); /* get results */
544 for (i = nchild; i--; ) {
545 writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2);
546 queue_results(i);
547 }
548 if (recover) /* and from before? */
549 queue_modifiers();
550 end_children(0); /* free up file descriptors */
551 for (i = 0; i < nmods; i++)
552 mod_output(out_bq->mca[i]); /* output accumulated record */
553 end_record();
554 free_binq(out_bq); /* clean up */
555 out_bq = NULL;
556 free_binq(NULL);
557 lu_done(&ofiletab);
558 if (raysleft)
559 error(USER, "unexpected EOF on input");
560 }