ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.18
Committed: Thu Nov 15 19:41:03 2012 UTC (11 years, 4 months ago) by greg
Content type: text/plain
Branch: MAIN
CVS Tags: rad4R2P2, rad4R2, rad4R2P1
Changes since 2.17: +20 -27 lines
Log Message:
Tweaks and fixes related to flushing with -c >1

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: rc3.c,v 2.17 2012/11/15 15:26:52 greg Exp $";
3 #endif
4 /*
5 * Accumulate ray contributions for a set of materials
6 * Controlling process for multiple children
7 */
8
9 #include <signal.h>
10 #include "rcontrib.h"
11 #include "selcall.h"
12
13 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
14
15 /* Modifier contribution queue (results waiting to be output) */
16 typedef struct s_binq {
17 RNUMBER ndx; /* index for this entry */
18 RNUMBER nadded; /* accumulated so far */
19 struct s_binq *next; /* next in queue */
20 MODCONT *mca[1]; /* contrib. array (extends struct) */
21 } BINQ;
22
23 static BINQ *out_bq = NULL; /* output bin queue */
24 static BINQ *free_bq = NULL; /* free queue entries */
25
26 static struct {
27 RNUMBER r1; /* assigned ray starting index */
28 SUBPROC pr; /* PID, i/o descriptors */
29 FILE *infp; /* file pointer to read from process */
30 int nr; /* number of rays to sum (0 if free) */
31 } kida[MAXPROCESS]; /* our child processes */
32
33
34 /* Get new bin queue entry */
35 static BINQ *
36 new_binq()
37 {
38 BINQ *bp;
39 int i;
40
41 if (free_bq != NULL) { /* something already available? */
42 bp = free_bq;
43 free_bq = bp->next;
44 bp->next = NULL;
45 bp->nadded = 0;
46 return(bp);
47 }
48 /* else allocate fresh */
49 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
50 if (bp == NULL)
51 goto memerr;
52 for (i = nmods; i--; ) {
53 MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
54 bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
55 sizeof(DCOLOR)*(mp->nbins-1));
56 if (bp->mca[i] == NULL)
57 goto memerr;
58 memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
59 /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
60 }
61 bp->ndx = 0;
62 bp->nadded = 0;
63 bp->next = NULL;
64 return(bp);
65 memerr:
66 error(SYSTEM, "out of memory in new_binq()");
67 return(NULL);
68 }
69
70
71 /* Free a bin queue entry */
72 static void
73 free_binq(BINQ *bp)
74 {
75 int i;
76
77 if (bp == NULL) { /* signal to release our free list */
78 while ((bp = free_bq) != NULL) {
79 free_bq = bp->next;
80 for (i = nmods; i--; )
81 free(bp->mca[i]);
82 /* Note: we don't own bp->mca[i]->binv */
83 free(bp);
84 }
85 return;
86 }
87 /* clear sums for next use */
88 /* for (i = nmods; i--; )
89 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
90 */
91 bp->ndx = 0;
92 bp->next = free_bq; /* push onto free list */
93 free_bq = bp;
94 }
95
96
97 /* Add modifier values to accumulation record in queue and clear */
98 static void
99 queue_modifiers()
100 {
101 MODCONT *mpin, *mpout;
102 int i, j;
103
104 if ((accumulate > 0) | (out_bq == NULL))
105 error(CONSISTENCY, "bad call to queue_modifiers()");
106
107 for (i = nmods; i--; ) {
108 mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
109 mpout = out_bq->mca[i];
110 for (j = mpout->nbins; j--; )
111 addcolor(mpout->cbin[j], mpin->cbin[j]);
112 memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
113 }
114 out_bq->nadded++;
115 }
116
117
118 /* Sum one modifier record into another (updates nadded) */
119 static void
120 add_modbin(BINQ *dst, BINQ *src)
121 {
122 int i, j;
123
124 for (i = nmods; i--; ) {
125 MODCONT *mpin = src->mca[i];
126 MODCONT *mpout = dst->mca[i];
127 for (j = mpout->nbins; j--; )
128 addcolor(mpout->cbin[j], mpin->cbin[j]);
129 }
130 dst->nadded += src->nadded;
131 }
132
133
134 /* Queue values for later output */
135 static void
136 queue_output(BINQ *bp)
137 {
138 BINQ *b_last, *b_cur;
139
140 if (accumulate <= 0) { /* just accumulating? */
141 if (out_bq == NULL) {
142 bp->next = NULL;
143 out_bq = bp;
144 } else {
145 add_modbin(out_bq, bp);
146 free_binq(bp);
147 }
148 return;
149 }
150 b_last = NULL; /* insert in output queue */
151 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
152 b_cur = b_cur->next)
153 b_last = b_cur;
154
155 if (b_last != NULL) {
156 bp->next = b_cur;
157 b_last->next = bp;
158 } else {
159 bp->next = out_bq;
160 out_bq = bp;
161 }
162 if (accumulate == 1) /* no accumulation? */
163 return;
164 b_cur = out_bq; /* else merge accumulation entries */
165 while (b_cur->next != NULL) {
166 if (b_cur->nadded >= accumulate ||
167 (b_cur->ndx-1)/accumulate !=
168 (b_cur->next->ndx-1)/accumulate) {
169 b_cur = b_cur->next;
170 continue;
171 }
172 add_modbin(b_cur, b_cur->next);
173 b_last = b_cur->next;
174 b_cur->next = b_last->next;
175 b_last->next = NULL;
176 free_binq(b_last);
177 }
178 }
179
180
181 /* Count number of records ready for output */
182 static int
183 queue_ready()
184 {
185 int nready = 0;
186 BINQ *bp;
187
188 for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
189 bp->ndx == lastdone+nready*accumulate+1;
190 bp = bp->next)
191 ++nready;
192
193 return(nready);
194 }
195
196
197 /* Catch up with output queue by producing ready results */
198 static int
199 output_catchup(int nmax)
200 {
201 int nout = 0;
202 BINQ *bp;
203 int i;
204 /* output ready results */
205 while (out_bq != NULL && out_bq->nadded >= accumulate
206 && out_bq->ndx == lastdone+1) {
207 if ((nmax > 0) & (nout >= nmax))
208 break;
209 bp = out_bq; /* pop off first entry */
210 out_bq = bp->next;
211 bp->next = NULL;
212 for (i = 0; i < nmods; i++) /* output record */
213 mod_output(bp->mca[i]);
214 end_record();
215 free_binq(bp); /* free this entry */
216 lastdone += accumulate;
217 ++nout;
218 }
219 return(nout);
220 }
221
222
223 /* Put a zero record in results queue & output */
224 void
225 put_zero_record(int ndx)
226 {
227 BINQ *bp = new_binq();
228 int i;
229
230 for (i = nmods; i--; )
231 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
232 bp->ndx = ndx;
233 bp->nadded = 1;
234 queue_output(bp);
235 output_catchup(0);
236 }
237
238
239 /* Get results from child process and add to queue */
240 static void
241 queue_results(int k)
242 {
243 BINQ *bq = new_binq(); /* get results holder */
244 int j;
245
246 bq->ndx = kida[k].r1;
247 bq->nadded = kida[k].nr;
248 /* read from child */
249 for (j = 0; j < nmods; j++)
250 if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
251 kida[k].infp) != bq->mca[j]->nbins)
252 error(SYSTEM, "read error from render process");
253
254 queue_output(bq); /* put results in output queue */
255 kida[k].nr = 0; /* mark child as available */
256 }
257
258
259 /* callback to set output spec to NULL (stdout) */
260 static int
261 set_stdout(const LUENT *le, void *p)
262 {
263 (*(MODCONT *)le->data).outspec = NULL;
264 return(0);
265 }
266
267
268 /* Start child processes if we can */
269 int
270 in_rchild()
271 {
272 int rval;
273
274 while (nchild < nproc) { /* fork until target reached */
275 errno = 0;
276 rval = open_process(&kida[nchild].pr, NULL);
277 if (rval < 0)
278 error(SYSTEM, "open_process() call failed");
279 if (rval == 0) { /* if in child, set up & return true */
280 lu_doall(&modconttab, &set_stdout, NULL);
281 lu_done(&ofiletab);
282 while (nchild--) { /* don't share other pipes */
283 close(kida[nchild].pr.w);
284 fclose(kida[nchild].infp);
285 }
286 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
287 outfmt = 'd';
288 header = 0;
289 yres = 0;
290 raysleft = 0;
291 if (accumulate == 1) {
292 waitflush = xres = 1;
293 account = accumulate = 1;
294 } else { /* parent controls accumulation */
295 waitflush = xres = 0;
296 account = accumulate = 0;
297 }
298 return(1); /* return "true" in child */
299 }
300 if (rval != PIPE_BUF)
301 error(CONSISTENCY, "bad value from open_process()");
302 /* connect to child's output */
303 kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
304 if (kida[nchild].infp == NULL)
305 error(SYSTEM, "out of memory in in_rchild()");
306 #ifdef getc_unlocked
307 flockfile(kida[nchild].infp); /* avoid mutex overhead */
308 #endif
309 kida[nchild++].nr = 0; /* mark as available */
310 }
311 return(0); /* return "false" in parent */
312 }
313
314
315 /* Close child processes */
316 void
317 end_children(int immed)
318 {
319 int status;
320
321 while (nchild > 0) {
322 nchild--;
323 #ifdef SIGKILL
324 if (immed) /* error mode -- quick exit */
325 kill(kida[nchild].pr.pid, SIGKILL);
326 #endif
327 if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) {
328 sprintf(errmsg,
329 "rendering process returned bad status (%d)",
330 status);
331 error(WARNING, errmsg);
332 }
333 fclose(kida[nchild].infp);
334 }
335 }
336
337
338 /* Wait for the next available child, managing output queue simultaneously */
339 static int
340 next_child_nq(int flushing)
341 {
342 static struct timeval polling;
343 struct timeval *pmode;
344 fd_set readset, errset;
345 int i, n, nr, nqr;
346
347 if (!flushing) /* see if there's one free */
348 for (i = nchild; i--; )
349 if (!kida[i].nr)
350 return(i);
351
352 nqr = queue_ready(); /* choose blocking mode or polling */
353 if ((nqr > 0) & !flushing)
354 pmode = &polling;
355 else
356 pmode = NULL;
357 tryagain: /* catch up with output? */
358 if (pmode == &polling) {
359 if (nqr > nchild) /* don't get too far behind */
360 nqr -= output_catchup(nqr-nchild);
361 } else if (nqr > 0) /* clear output before blocking */
362 nqr -= output_catchup(0);
363 /* prepare select() call */
364 FD_ZERO(&readset); FD_ZERO(&errset);
365 n = nr = 0;
366 for (i = nchild; i--; ) {
367 if (kida[i].nr) {
368 FD_SET(kida[i].pr.r, &readset);
369 ++nr;
370 }
371 FD_SET(kida[i].pr.r, &errset);
372 if (kida[i].pr.r >= n)
373 n = kida[i].pr.r + 1;
374 }
375 if (!nr) /* nothing to wait for? */
376 return(-1);
377 if ((nr > 1) | (pmode == &polling)) {
378 errno = 0;
379 nr = select(n, &readset, NULL, &errset, pmode);
380 if (!nr) {
381 pmode = NULL; /* try again, blocking this time */
382 goto tryagain;
383 }
384 if (nr < 0)
385 error(SYSTEM, "select() error in next_child_nq()");
386 } else
387 FD_ZERO(&errset);
388 n = -1; /* read results from child(ren) */
389 for (i = nchild; i--; ) {
390 if (FD_ISSET(kida[i].pr.r, &errset))
391 error(USER, "rendering process died");
392 if (FD_ISSET(kida[i].pr.r, &readset))
393 queue_results(n = i);
394 }
395 return(n); /* first available child */
396 }
397
398
399 /* Run parental oversight loop */
400 void
401 parental_loop()
402 {
403 const int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
404 int ninq = 0;
405 FVECT orgdir[2*MAXIQ];
406 int i, n;
407 /* load rays from stdin & process */
408 #ifdef getc_unlocked
409 flockfile(stdin); /* avoid lock/unlock overhead */
410 #endif
411 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
412 const int zero_ray = orgdir[2*ninq+1][0] == 0.0 &&
413 (orgdir[2*ninq+1][1] == 0.0) &
414 (orgdir[2*ninq+1][2] == 0.0);
415 ninq += !zero_ray;
416 /* Zero ray cannot go in input queue */
417 if (zero_ray ? ninq : ninq >= qlimit ||
418 lastray/accumulate != (lastray+ninq)/accumulate) {
419 i = next_child_nq(0); /* manages output */
420 n = ninq;
421 if (accumulate > 1) /* need terminator? */
422 memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
423 n *= sizeof(FVECT)*2; /* send assignment */
424 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
425 error(SYSTEM, "pipe write error");
426 kida[i].r1 = lastray+1;
427 lastray += kida[i].nr = ninq; /* mark as busy */
428 if (lastray < lastdone) { /* RNUMBER wrapped? */
429 while (next_child_nq(1) >= 0)
430 ;
431 lastray -= ninq;
432 lastdone = lastray %= accumulate;
433 }
434 ninq = 0;
435 }
436 if (zero_ray) { /* put bogus record? */
437 if ((yres <= 0) | (xres <= 1) &&
438 (lastray+1) % accumulate == 0) {
439 while (next_child_nq(1) >= 0)
440 ; /* clear the queue */
441 lastdone = lastray = accumulate-1;
442 waitflush = 1; /* flush next */
443 }
444 put_zero_record(++lastray);
445 }
446 if (raysleft && !--raysleft)
447 break; /* preemptive EOI */
448 }
449 while (next_child_nq(1) >= 0) /* empty results queue */
450 ;
451 if (account < accumulate) {
452 error(WARNING, "partial accumulation in final record");
453 free_binq(out_bq); /* XXX just ignore it */
454 out_bq = NULL;
455 }
456 free_binq(NULL); /* clean up */
457 lu_done(&ofiletab);
458 if (raysleft)
459 error(USER, "unexpected EOF on input");
460 }
461
462
463 /* Wait for the next available child by monitoring "to" pipes */
464 static int
465 next_child_ready()
466 {
467 fd_set writeset, errset;
468 int i, n;
469
470 for (i = nchild; i--; ) /* see if there's one free first */
471 if (!kida[i].nr)
472 return(i);
473 /* prepare select() call */
474 FD_ZERO(&writeset); FD_ZERO(&errset);
475 n = 0;
476 for (i = nchild; i--; ) {
477 FD_SET(kida[i].pr.w, &writeset);
478 FD_SET(kida[i].pr.r, &errset);
479 if (kida[i].pr.w >= n)
480 n = kida[i].pr.w + 1;
481 if (kida[i].pr.r >= n)
482 n = kida[i].pr.r + 1;
483 }
484 errno = 0;
485 n = select(n, NULL, &writeset, &errset, NULL);
486 if (n < 0)
487 error(SYSTEM, "select() error in next_child_ready()");
488 n = -1; /* identify waiting child */
489 for (i = nchild; i--; ) {
490 if (FD_ISSET(kida[i].pr.r, &errset))
491 error(USER, "rendering process died");
492 if (FD_ISSET(kida[i].pr.w, &writeset))
493 kida[n = i].nr = 0;
494 }
495 return(n); /* first available child */
496 }
497
498
499 /* Modified parental loop for full accumulation mode (-c 0) */
500 void
501 feeder_loop()
502 {
503 static int ignore_warning_given = 0;
504 int ninq = 0;
505 FVECT orgdir[2*MAXIQ];
506 int i, n;
507 /* load rays from stdin & process */
508 #ifdef getc_unlocked
509 flockfile(stdin); /* avoid lock/unlock overhead */
510 #endif
511 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
512 if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
513 (orgdir[2*ninq+1][1] == 0.0) &
514 (orgdir[2*ninq+1][2] == 0.0)) {
515 if (!ignore_warning_given++)
516 error(WARNING,
517 "dummy ray(s) ignored during accumulation\n");
518 continue;
519 }
520 if (++ninq >= MAXIQ) {
521 i = next_child_ready(); /* get eager child */
522 n = sizeof(FVECT)*2 * ninq; /* give assignment */
523 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
524 error(SYSTEM, "pipe write error");
525 kida[i].r1 = lastray+1;
526 lastray += kida[i].nr = ninq;
527 if (lastray < lastdone) /* RNUMBER wrapped? */
528 lastdone = lastray = 0;
529 ninq = 0;
530 }
531 if (raysleft && !--raysleft)
532 break; /* preemptive EOI */
533 }
534 if (ninq) { /* polish off input */
535 i = next_child_ready();
536 n = sizeof(FVECT)*2 * ninq;
537 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
538 error(SYSTEM, "pipe write error");
539 kida[i].r1 = lastray+1;
540 lastray += kida[i].nr = ninq;
541 ninq = 0;
542 }
543 memset(orgdir, 0, sizeof(FVECT)*2); /* get results */
544 for (i = nchild; i--; ) {
545 writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2);
546 queue_results(i);
547 }
548 if (recover) /* and from before? */
549 queue_modifiers();
550 end_children(0); /* free up file descriptors */
551 for (i = 0; i < nmods; i++)
552 mod_output(out_bq->mca[i]); /* output accumulated record */
553 end_record();
554 free_binq(out_bq); /* clean up */
555 out_bq = NULL;
556 free_binq(NULL);
557 lu_done(&ofiletab);
558 if (raysleft)
559 error(USER, "unexpected EOF on input");
560 }