ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.12
Committed: Tue Jun 19 00:12:08 2012 UTC (11 years, 10 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.11: +6 -4 lines
Log Message:
Added quick-exit mode and fixed bug in recovery option with -c > 1

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: rc3.c,v 2.11 2012/06/16 17:30:13 greg Exp $";
3 #endif
4 /*
5 * Accumulate ray contributions for a set of materials
6 * Controlling process for multiple children
7 */
8
9 #include "rcontrib.h"
10 #include "platform.h"
11 #include "rtprocess.h"
12 #include "selcall.h"
13
14 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
15
16 /* Modifier contribution queue (results waiting to be output) */
17 typedef struct s_binq {
18 RNUMBER ndx; /* index for this entry */
19 RNUMBER nadded; /* accumulated so far */
20 struct s_binq *next; /* next in queue */
21 MODCONT *mca[1]; /* contrib. array (extends struct) */
22 } BINQ;
23
24 static BINQ *out_bq = NULL; /* output bin queue */
25 static BINQ *free_bq = NULL; /* free queue entries */
26
27 static struct {
28 RNUMBER r1; /* assigned ray starting index */
29 SUBPROC pr; /* PID, i/o descriptors */
30 FILE *infp; /* file pointer to read from process */
31 int nr; /* number of rays to sum (0 if free) */
32 } kida[MAXPROCESS]; /* our child processes */
33
34
35 /* Get new bin queue entry */
36 static BINQ *
37 new_binq()
38 {
39 BINQ *bp;
40 int i;
41
42 if (free_bq != NULL) { /* something already available? */
43 bp = free_bq;
44 free_bq = bp->next;
45 bp->next = NULL;
46 bp->nadded = 0;
47 return(bp);
48 }
49 /* else allocate fresh */
50 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
51 if (bp == NULL)
52 goto memerr;
53 for (i = nmods; i--; ) {
54 MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
55 bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
56 sizeof(DCOLOR)*(mp->nbins-1));
57 if (bp->mca[i] == NULL)
58 goto memerr;
59 memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
60 /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
61 }
62 bp->ndx = 0;
63 bp->nadded = 0;
64 bp->next = NULL;
65 return(bp);
66 memerr:
67 error(SYSTEM, "out of memory in new_binq()");
68 return(NULL);
69 }
70
71
72 /* Free a bin queue entry */
73 static void
74 free_binq(BINQ *bp)
75 {
76 int i;
77
78 if (bp == NULL) { /* signal to release our free list */
79 while ((bp = free_bq) != NULL) {
80 free_bq = bp->next;
81 for (i = nmods; i--; )
82 free(bp->mca[i]);
83 /* Note: we don't own bp->mca[i]->binv */
84 free(bp);
85 }
86 return;
87 }
88 /* clear sums for next use */
89 /* for (i = nmods; i--; )
90 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
91 */
92 bp->ndx = 0;
93 bp->next = free_bq; /* push onto free list */
94 free_bq = bp;
95 }
96
97
98 /* Add modifier values to accumulation record in queue and clear */
99 static void
100 queue_modifiers()
101 {
102 MODCONT *mpin, *mpout;
103 int i, j;
104
105 if ((accumulate > 0) | (out_bq == NULL))
106 error(CONSISTENCY, "bad call to queue_modifiers()");
107
108 for (i = nmods; i--; ) {
109 mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
110 mpout = out_bq->mca[i];
111 for (j = mpout->nbins; j--; )
112 addcolor(mpout->cbin[j], mpin->cbin[j]);
113 memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
114 }
115 out_bq->nadded++;
116 }
117
118
119 /* Sum one modifier record into another (updates nadded) */
120 static void
121 add_modbin(BINQ *dst, BINQ *src)
122 {
123 int i, j;
124
125 for (i = nmods; i--; ) {
126 MODCONT *mpin = src->mca[i];
127 MODCONT *mpout = dst->mca[i];
128 for (j = mpout->nbins; j--; )
129 addcolor(mpout->cbin[j], mpin->cbin[j]);
130 }
131 dst->nadded += src->nadded;
132 }
133
134
135 /* Queue values for later output */
136 static void
137 queue_output(BINQ *bp)
138 {
139 BINQ *b_last, *b_cur;
140
141 if (accumulate <= 0) { /* just accumulating? */
142 if (out_bq == NULL) {
143 bp->next = NULL;
144 out_bq = bp;
145 } else {
146 add_modbin(out_bq, bp);
147 free_binq(bp);
148 }
149 return;
150 }
151 b_last = NULL; /* insert in output queue */
152 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
153 b_cur = b_cur->next)
154 b_last = b_cur;
155
156 if (b_last != NULL) {
157 bp->next = b_cur;
158 b_last->next = bp;
159 } else {
160 bp->next = out_bq;
161 out_bq = bp;
162 }
163 if (accumulate == 1) /* no accumulation? */
164 return;
165 b_cur = out_bq; /* else merge accumulation entries */
166 while (b_cur->next != NULL) {
167 if (b_cur->nadded >= accumulate ||
168 (b_cur->ndx-1)/accumulate !=
169 (b_cur->next->ndx-1)/accumulate) {
170 b_cur = b_cur->next;
171 continue;
172 }
173 add_modbin(b_cur, b_cur->next);
174 b_last = b_cur->next;
175 b_cur->next = b_last->next;
176 b_last->next = NULL;
177 free_binq(b_last);
178 }
179 }
180
181
182 /* Count number of records ready for output */
183 static int
184 queue_ready()
185 {
186 int nready = 0;
187 BINQ *bp;
188
189 for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
190 bp->ndx == lastdone+nready*accumulate+1;
191 bp = bp->next)
192 ++nready;
193
194 return(nready);
195 }
196
197
198 /* Catch up with output queue by producing ready results */
199 static int
200 output_catchup(int nmax)
201 {
202 int nout = 0;
203 BINQ *bp;
204 int i;
205 /* output ready results */
206 while (out_bq != NULL && out_bq->nadded >= accumulate
207 && out_bq->ndx == lastdone+1) {
208 if ((nmax > 0) & (nout >= nmax))
209 break;
210 bp = out_bq; /* pop off first entry */
211 out_bq = bp->next;
212 bp->next = NULL;
213 for (i = 0; i < nmods; i++) /* output record */
214 mod_output(bp->mca[i]);
215 end_record();
216 free_binq(bp); /* free this entry */
217 lastdone += accumulate;
218 ++nout;
219 }
220 return(nout);
221 }
222
223
224 /* Put a zero record in results queue & output */
225 void
226 put_zero_record(int ndx)
227 {
228 BINQ *bp = new_binq();
229 int i;
230
231 for (i = nmods; i--; )
232 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
233 bp->ndx = ndx;
234 bp->nadded = 1;
235 queue_output(bp);
236 output_catchup(0);
237 }
238
239
240 /* Get results from child process and add to queue */
241 static void
242 queue_results(int k)
243 {
244 BINQ *bq = new_binq(); /* get results holder */
245 int j;
246
247 bq->ndx = kida[k].r1;
248 bq->nadded = kida[k].nr;
249 /* read from child */
250 for (j = 0; j < nmods; j++)
251 if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
252 kida[k].infp) != bq->mca[j]->nbins)
253 error(SYSTEM, "read error from render process");
254
255 queue_output(bq); /* put results in output queue */
256 kida[k].nr = 0; /* mark child as available */
257 }
258
259
260 /* callback to set output spec to NULL (stdout) */
261 static int
262 set_stdout(const LUENT *le, void *p)
263 {
264 (*(MODCONT *)le->data).outspec = NULL;
265 return(0);
266 }
267
268
269 /* Start child processes if we can */
270 int
271 in_rchild()
272 {
273 int rval;
274
275 while (nchild < nproc) { /* fork until target reached */
276 errno = 0;
277 rval = open_process(&kida[nchild].pr, NULL);
278 if (rval < 0)
279 error(SYSTEM, "open_process() call failed");
280 if (rval == 0) { /* if in child, set up & return true */
281 lu_doall(&modconttab, set_stdout, NULL);
282 lu_done(&ofiletab);
283 while (nchild--) { /* don't share other pipes */
284 close(kida[nchild].pr.w);
285 fclose(kida[nchild].infp);
286 }
287 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
288 outfmt = 'd';
289 header = 0;
290 yres = 0;
291 raysleft = 0;
292 if (accumulate == 1) {
293 waitflush = xres = 1;
294 account = accumulate = 1;
295 } else { /* parent controls accumulation */
296 waitflush = xres = 0;
297 account = accumulate = 0;
298 }
299 return(1); /* return "true" in child */
300 }
301 if (rval != PIPE_BUF)
302 error(CONSISTENCY, "bad value from open_process()");
303 /* connect to child's output */
304 kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
305 if (kida[nchild].infp == NULL)
306 error(SYSTEM, "out of memory in in_rchild()");
307 #ifdef getc_unlocked
308 flockfile(kida[nchild].infp); /* avoid mutex overhead */
309 #endif
310 kida[nchild++].nr = 0; /* mark as available */
311 }
312 return(0); /* return "false" in parent */
313 }
314
315
316 /* Close child processes */
317 void
318 end_children(int immed)
319 {
320 int status;
321
322 while (nchild > 0) {
323 nchild--;
324 if (immed) /* error mode -- quick exit */
325 kill(kida[nchild].pr.pid, SIGKILL);
326 if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) {
327 sprintf(errmsg,
328 "rendering process returned bad status (%d)",
329 status);
330 error(WARNING, errmsg);
331 }
332 fclose(kida[nchild].infp);
333 }
334 }
335
336
337 /* Wait for the next available child, managing output queue simultaneously */
338 static int
339 next_child_nq(int flushing)
340 {
341 static struct timeval polling;
342 struct timeval *pmode;
343 fd_set readset, errset;
344 int i, n, nr, nqr;
345
346 if (!flushing) /* see if there's one free */
347 for (i = nchild; i--; )
348 if (!kida[i].nr)
349 return(i);
350
351 nqr = queue_ready(); /* choose blocking mode or polling */
352 if ((nqr > 0) & !flushing)
353 pmode = &polling;
354 else
355 pmode = NULL;
356 tryagain: /* catch up with output? */
357 if (pmode == &polling) {
358 if (nqr > nchild) /* don't get too far behind */
359 nqr -= output_catchup(nqr-nchild);
360 } else if (nqr > 0) /* clear output before blocking */
361 nqr -= output_catchup(0);
362 /* prepare select() call */
363 FD_ZERO(&readset); FD_ZERO(&errset);
364 n = nr = 0;
365 for (i = nchild; i--; ) {
366 if (kida[i].nr) {
367 FD_SET(kida[i].pr.r, &readset);
368 ++nr;
369 }
370 FD_SET(kida[i].pr.r, &errset);
371 if (kida[i].pr.r >= n)
372 n = kida[i].pr.r + 1;
373 }
374 if (!nr) /* nothing to wait for? */
375 return(-1);
376 if ((nr > 1) | (pmode == &polling)) {
377 errno = 0;
378 nr = select(n, &readset, NULL, &errset, pmode);
379 if (!nr) {
380 pmode = NULL; /* try again, blocking this time */
381 goto tryagain;
382 }
383 if (nr < 0)
384 error(SYSTEM, "select() error in next_child_nq()");
385 } else
386 FD_ZERO(&errset);
387 n = -1; /* read results from child(ren) */
388 for (i = nchild; i--; ) {
389 if (FD_ISSET(kida[i].pr.r, &errset))
390 error(USER, "rendering process died");
391 if (FD_ISSET(kida[i].pr.r, &readset))
392 queue_results(n = i);
393 }
394 return(n); /* first available child */
395 }
396
397
398 /* Run parental oversight loop */
399 void
400 parental_loop()
401 {
402 static int ignore_warning_given = 0;
403 int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
404 int ninq = 0;
405 FVECT orgdir[2*MAXIQ];
406 int i, n;
407 /* load rays from stdin & process */
408 #ifdef getc_unlocked
409 flockfile(stdin); /* avoid lock/unlock overhead */
410 #endif
411 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
412 if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
413 (orgdir[2*ninq+1][1] == 0.0) &
414 (orgdir[2*ninq+1][2] == 0.0)) {
415 if (accumulate != 1) {
416 if (!ignore_warning_given++)
417 error(WARNING,
418 "dummy ray(s) ignored during accumulation\n");
419 continue;
420 }
421 while (next_child_nq(1) >= 0)
422 ; /* clear the queue */
423 lastdone = lastray = 0;
424 if ((yres <= 0) | (xres <= 0))
425 waitflush = 1; /* flush next */
426 put_zero_record(++lastray);
427 } else if (++ninq >= qlimit ||
428 lastray/accumulate != (lastray+ninq)/accumulate) {
429 i = next_child_nq(0); /* manages output */
430 n = ninq;
431 if (accumulate != 1) /* request flush? */
432 memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
433 n *= sizeof(FVECT)*2; /* send assignment */
434 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
435 error(SYSTEM, "pipe write error");
436 kida[i].r1 = lastray+1;
437 lastray += kida[i].nr = ninq; /* mark as busy */
438 ninq = 0;
439 if (lastray < lastdone) { /* RNUMBER wrapped? */
440 while (next_child_nq(1) >= 0)
441 ;
442 lastdone = lastray = 0;
443 }
444 }
445 if (raysleft && !--raysleft)
446 break; /* preemptive EOI */
447 }
448 while (next_child_nq(1) >= 0) /* empty results queue */
449 ;
450 if (account < accumulate) {
451 error(WARNING, "partial accumulation in final record");
452 free_binq(out_bq); /* XXX just ignore it */
453 out_bq = NULL;
454 }
455 free_binq(NULL); /* clean up */
456 lu_done(&ofiletab);
457 if (raysleft)
458 error(USER, "unexpected EOF on input");
459 }
460
461
462 /* Wait for the next available child by monitoring "to" pipes */
463 static int
464 next_child_ready()
465 {
466 fd_set writeset, errset;
467 int i, n;
468
469 for (i = nchild; i--; ) /* see if there's one free first */
470 if (!kida[i].nr)
471 return(i);
472 /* prepare select() call */
473 FD_ZERO(&writeset); FD_ZERO(&errset);
474 n = 0;
475 for (i = nchild; i--; ) {
476 FD_SET(kida[i].pr.w, &writeset);
477 FD_SET(kida[i].pr.r, &errset);
478 if (kida[i].pr.w >= n)
479 n = kida[i].pr.w + 1;
480 if (kida[i].pr.r >= n)
481 n = kida[i].pr.r + 1;
482 }
483 errno = 0;
484 n = select(n, NULL, &writeset, &errset, NULL);
485 if (n < 0)
486 error(SYSTEM, "select() error in next_child_ready()");
487 n = -1; /* identify waiting child */
488 for (i = nchild; i--; ) {
489 if (FD_ISSET(kida[i].pr.r, &errset))
490 error(USER, "rendering process died");
491 if (FD_ISSET(kida[i].pr.w, &writeset))
492 kida[n = i].nr = 0;
493 }
494 return(n); /* first available child */
495 }
496
497
498 /* Modified parental loop for full accumulation mode (-c 0) */
499 void
500 feeder_loop()
501 {
502 static int ignore_warning_given = 0;
503 int ninq = 0;
504 FVECT orgdir[2*MAXIQ];
505 int i, n;
506 /* load rays from stdin & process */
507 #ifdef getc_unlocked
508 flockfile(stdin); /* avoid lock/unlock overhead */
509 #endif
510 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
511 if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
512 (orgdir[2*ninq+1][1] == 0.0) &
513 (orgdir[2*ninq+1][2] == 0.0)) {
514 if (!ignore_warning_given++)
515 error(WARNING,
516 "dummy ray(s) ignored during accumulation\n");
517 continue;
518 }
519 if (++ninq >= MAXIQ) {
520 i = next_child_ready(); /* get eager child */
521 n = sizeof(FVECT)*2 * ninq; /* give assignment */
522 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
523 error(SYSTEM, "pipe write error");
524 kida[i].r1 = lastray+1;
525 lastray += kida[i].nr = ninq;
526 ninq = 0;
527 if (lastray < lastdone) /* RNUMBER wrapped? */
528 lastdone = lastray = 0;
529 }
530 if (raysleft && !--raysleft)
531 break; /* preemptive EOI */
532 }
533 if (ninq) { /* polish off input */
534 i = next_child_ready();
535 n = sizeof(FVECT)*2 * ninq;
536 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
537 error(SYSTEM, "pipe write error");
538 kida[i].r1 = lastray+1;
539 lastray += kida[i].nr = ninq;
540 ninq = 0;
541 }
542 for (i = nchild; i--; ) { /* get results */
543 close(kida[i].pr.w);
544 queue_results(i);
545 }
546 if (recover) /* and from before? */
547 queue_modifiers();
548 end_children(0); /* free up file descriptors */
549 for (i = 0; i < nmods; i++)
550 mod_output(out_bq->mca[i]); /* output accumulated record */
551 end_record();
552 free_binq(out_bq); /* clean up */
553 out_bq = NULL;
554 free_binq(NULL);
555 lu_done(&ofiletab);
556 if (raysleft)
557 error(USER, "unexpected EOF on input");
558 }