ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.20
Committed: Sun Aug 23 00:17:12 2015 UTC (8 years, 9 months ago) by greg
Content type: text/plain
Branch: MAIN
CVS Tags: rad5R0
Changes since 2.19: +6 -5 lines
Log Message:
Minor tweaks & performance improvements for rcontrib file locking

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: rc3.c,v 2.19 2015/08/21 18:21:05 greg Exp $";
3 #endif
4 /*
5 * Accumulate ray contributions for a set of materials
6 * Controlling process for multiple children
7 */
8
9 #include <signal.h>
10 #include "rcontrib.h"
11 #include "selcall.h"
12
13 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
14
15 /* Modifier contribution queue (results waiting to be output) */
16 typedef struct s_binq {
17 RNUMBER ndx; /* index for this entry */
18 RNUMBER nadded; /* accumulated so far */
19 struct s_binq *next; /* next in queue */
20 MODCONT *mca[1]; /* contrib. array (extends struct) */
21 } BINQ;
22
23 static BINQ *out_bq = NULL; /* output bin queue */
24 static BINQ *free_bq = NULL; /* free queue entries */
25
26 static struct {
27 RNUMBER r1; /* assigned ray starting index */
28 SUBPROC pr; /* PID, i/o descriptors */
29 FILE *infp; /* file pointer to read from process */
30 int nr; /* number of rays to sum (0 if free) */
31 } kida[MAXPROCESS]; /* our child processes */
32
33
34 /* Get new bin queue entry */
35 static BINQ *
36 new_binq()
37 {
38 BINQ *bp;
39 int i;
40
41 if (free_bq != NULL) { /* something already available? */
42 bp = free_bq;
43 free_bq = bp->next;
44 bp->next = NULL;
45 bp->nadded = 0;
46 return(bp);
47 }
48 /* else allocate fresh */
49 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
50 if (bp == NULL)
51 goto memerr;
52 for (i = nmods; i--; ) {
53 MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
54 bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
55 sizeof(DCOLOR)*(mp->nbins-1));
56 if (bp->mca[i] == NULL)
57 goto memerr;
58 memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
59 /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
60 }
61 bp->ndx = 0;
62 bp->nadded = 0;
63 bp->next = NULL;
64 return(bp);
65 memerr:
66 error(SYSTEM, "out of memory in new_binq()");
67 return(NULL);
68 }
69
70
71 /* Free a bin queue entry */
72 static void
73 free_binq(BINQ *bp)
74 {
75 int i;
76
77 if (bp == NULL) { /* signal to release our free list */
78 while ((bp = free_bq) != NULL) {
79 free_bq = bp->next;
80 for (i = nmods; i--; )
81 free(bp->mca[i]);
82 /* Note: we don't own bp->mca[i]->binv */
83 free(bp);
84 }
85 return;
86 }
87 /* clear sums for next use */
88 /* for (i = nmods; i--; )
89 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
90 */
91 bp->ndx = 0;
92 bp->next = free_bq; /* push onto free list */
93 free_bq = bp;
94 }
95
96
97 /* Add modifier values to accumulation record in queue and clear */
98 static void
99 queue_modifiers()
100 {
101 MODCONT *mpin, *mpout;
102 int i, j;
103
104 if ((accumulate > 0) | (out_bq == NULL))
105 error(CONSISTENCY, "bad call to queue_modifiers()");
106
107 for (i = nmods; i--; ) {
108 mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
109 mpout = out_bq->mca[i];
110 for (j = mpout->nbins; j--; )
111 addcolor(mpout->cbin[j], mpin->cbin[j]);
112 memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
113 }
114 out_bq->nadded++;
115 }
116
117
118 /* Sum one modifier record into another (updates nadded) */
119 static void
120 add_modbin(BINQ *dst, BINQ *src)
121 {
122 int i, j;
123
124 for (i = nmods; i--; ) {
125 MODCONT *mpin = src->mca[i];
126 MODCONT *mpout = dst->mca[i];
127 for (j = mpout->nbins; j--; )
128 addcolor(mpout->cbin[j], mpin->cbin[j]);
129 }
130 dst->nadded += src->nadded;
131 }
132
133
134 /* Queue values for later output */
135 static void
136 queue_output(BINQ *bp)
137 {
138 BINQ *b_last, *b_cur;
139
140 if (accumulate <= 0) { /* just accumulating? */
141 if (out_bq == NULL) {
142 bp->next = NULL;
143 out_bq = bp;
144 } else {
145 add_modbin(out_bq, bp);
146 free_binq(bp);
147 }
148 return;
149 }
150 b_last = NULL; /* insert in output queue */
151 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
152 b_cur = b_cur->next)
153 b_last = b_cur;
154
155 if (b_last != NULL) {
156 bp->next = b_cur;
157 b_last->next = bp;
158 } else {
159 bp->next = out_bq;
160 out_bq = bp;
161 }
162 if (accumulate == 1) /* no accumulation? */
163 return;
164 b_cur = out_bq; /* else merge accumulation entries */
165 while (b_cur->next != NULL) {
166 if (b_cur->nadded >= accumulate ||
167 (b_cur->ndx-1)/accumulate !=
168 (b_cur->next->ndx-1)/accumulate) {
169 b_cur = b_cur->next;
170 continue;
171 }
172 add_modbin(b_cur, b_cur->next);
173 b_last = b_cur->next;
174 b_cur->next = b_last->next;
175 b_last->next = NULL;
176 free_binq(b_last);
177 }
178 }
179
180
181 /* Count number of records ready for output */
182 static int
183 queue_ready()
184 {
185 int nready = 0;
186 BINQ *bp;
187
188 for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
189 bp->ndx == lastdone+nready*accumulate+1;
190 bp = bp->next)
191 ++nready;
192
193 return(nready);
194 }
195
196
197 /* Catch up with output queue by producing ready results */
198 static int
199 output_catchup(int nmax)
200 {
201 int nout = 0;
202 BINQ *bp;
203 int i;
204 /* output ready results */
205 while (out_bq != NULL && out_bq->nadded >= accumulate
206 && out_bq->ndx == lastdone+1) {
207 if ((nmax > 0) & (nout >= nmax))
208 break;
209 bp = out_bq; /* pop off first entry */
210 out_bq = bp->next;
211 bp->next = NULL;
212 for (i = 0; i < nmods; i++) /* output record */
213 mod_output(bp->mca[i]);
214 end_record();
215 free_binq(bp); /* free this entry */
216 lastdone += accumulate;
217 ++nout;
218 }
219 return(nout);
220 }
221
222
223 /* Put a zero record in results queue & output */
224 void
225 put_zero_record(int ndx)
226 {
227 BINQ *bp = new_binq();
228 int i;
229
230 for (i = nmods; i--; )
231 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
232 bp->ndx = ndx;
233 bp->nadded = 1;
234 queue_output(bp);
235 output_catchup(0);
236 }
237
238
239 /* Get results from child process and add to queue */
240 static void
241 queue_results(int k)
242 {
243 BINQ *bq = new_binq(); /* get results holder */
244 int j;
245
246 bq->ndx = kida[k].r1;
247 bq->nadded = kida[k].nr;
248 /* read from child */
249 for (j = 0; j < nmods; j++)
250 if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
251 kida[k].infp) != bq->mca[j]->nbins)
252 error(SYSTEM, "read error from render process");
253
254 queue_output(bq); /* put results in output queue */
255 kida[k].nr = 0; /* mark child as available */
256 }
257
258
259 /* callback to set output spec to NULL (stdout) */
260 static int
261 set_stdout(const LUENT *le, void *p)
262 {
263 (*(MODCONT *)le->data).outspec = NULL;
264 return(0);
265 }
266
267
268 /* Start child processes if we can (call only once in parent!) */
269 int
270 in_rchild()
271 {
272 int rval;
273
274 while (nchild < nproc) { /* fork until target reached */
275 errno = 0;
276 rval = open_process(&kida[nchild].pr, NULL);
277 if (rval < 0)
278 error(SYSTEM, "open_process() call failed");
279 if (rval == 0) { /* if in child, set up & return true */
280 lu_doall(&modconttab, &set_stdout, NULL);
281 lu_done(&ofiletab);
282 while (nchild--) { /* don't share other pipes */
283 close(kida[nchild].pr.w);
284 fclose(kida[nchild].infp);
285 }
286 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
287 outfmt = 'd';
288 header = 0;
289 yres = 0;
290 raysleft = 0;
291 if (accumulate == 1) {
292 waitflush = xres = 1;
293 account = accumulate = 1;
294 } else { /* parent controls accumulation */
295 waitflush = xres = 0;
296 account = accumulate = 0;
297 }
298 return(1); /* return "true" in child */
299 }
300 if (rval != PIPE_BUF)
301 error(CONSISTENCY, "bad value from open_process()");
302 /* connect to child's output */
303 kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
304 if (kida[nchild].infp == NULL)
305 error(SYSTEM, "out of memory in in_rchild()");
306 kida[nchild++].nr = 0; /* mark as available */
307 }
308 #ifdef getc_unlocked
309 for (rval = nchild; rval--; ) /* avoid mutex overhead */
310 flockfile(kida[rval].infp);
311 #endif
312 return(0); /* return "false" in parent */
313 }
314
315
316 /* Close child processes */
317 void
318 end_children(int immed)
319 {
320 int status;
321
322 while (nchild > 0) {
323 nchild--;
324 #ifdef SIGKILL
325 if (immed) /* error mode -- quick exit */
326 kill(kida[nchild].pr.pid, SIGKILL);
327 #endif
328 if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) {
329 sprintf(errmsg,
330 "rendering process returned bad status (%d)",
331 status);
332 error(WARNING, errmsg);
333 }
334 fclose(kida[nchild].infp);
335 }
336 }
337
338
339 /* Wait for the next available child, managing output queue simultaneously */
340 static int
341 next_child_nq(int flushing)
342 {
343 static struct timeval polling;
344 struct timeval *pmode;
345 fd_set readset, errset;
346 int i, n, nr, nqr;
347
348 if (!flushing) /* see if there's one free */
349 for (i = nchild; i--; )
350 if (!kida[i].nr)
351 return(i);
352
353 nqr = queue_ready(); /* choose blocking mode or polling */
354 if ((nqr > 0) & !flushing)
355 pmode = &polling;
356 else
357 pmode = NULL;
358 tryagain: /* catch up with output? */
359 if (pmode == &polling) {
360 if (nqr > nchild) /* don't get too far behind */
361 nqr -= output_catchup(nqr-nchild);
362 } else if (nqr > 0) /* clear output before blocking */
363 nqr -= output_catchup(0);
364 /* prepare select() call */
365 FD_ZERO(&readset); FD_ZERO(&errset);
366 n = nr = 0;
367 for (i = nchild; i--; ) {
368 if (kida[i].nr) {
369 FD_SET(kida[i].pr.r, &readset);
370 ++nr;
371 }
372 FD_SET(kida[i].pr.r, &errset);
373 if (kida[i].pr.r >= n)
374 n = kida[i].pr.r + 1;
375 }
376 if (!nr) /* nothing to wait for? */
377 return(-1);
378 if ((nr > 1) | (pmode == &polling)) {
379 errno = 0;
380 nr = select(n, &readset, NULL, &errset, pmode);
381 if (!nr) {
382 pmode = NULL; /* try again, blocking this time */
383 goto tryagain;
384 }
385 if (nr < 0)
386 error(SYSTEM, "select() error in next_child_nq()");
387 } else
388 FD_ZERO(&errset);
389 n = -1; /* read results from child(ren) */
390 for (i = nchild; i--; ) {
391 if (FD_ISSET(kida[i].pr.r, &errset))
392 error(USER, "rendering process died");
393 if (FD_ISSET(kida[i].pr.r, &readset))
394 queue_results(n = i);
395 }
396 return(n); /* first available child */
397 }
398
399
400 /* Run parental oversight loop */
401 void
402 parental_loop()
403 {
404 const int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
405 int ninq = 0;
406 FVECT orgdir[2*MAXIQ];
407 int i, n;
408 /* load rays from stdin & process */
409 #ifdef getc_unlocked
410 flockfile(stdin); /* avoid lock/unlock overhead */
411 #endif
412 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
413 const int zero_ray = orgdir[2*ninq+1][0] == 0.0 &&
414 (orgdir[2*ninq+1][1] == 0.0) &
415 (orgdir[2*ninq+1][2] == 0.0);
416 ninq += !zero_ray;
417 /* Zero ray cannot go in input queue */
418 if (zero_ray ? ninq : ninq >= qlimit ||
419 lastray/accumulate != (lastray+ninq)/accumulate) {
420 i = next_child_nq(0); /* manages output */
421 n = ninq;
422 if (accumulate > 1) /* need terminator? */
423 memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
424 n *= sizeof(FVECT)*2; /* send assignment */
425 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
426 error(SYSTEM, "pipe write error");
427 kida[i].r1 = lastray+1;
428 lastray += kida[i].nr = ninq; /* mark as busy */
429 if (lastray < lastdone) { /* RNUMBER wrapped? */
430 while (next_child_nq(1) >= 0)
431 ;
432 lastray -= ninq;
433 lastdone = lastray %= accumulate;
434 }
435 ninq = 0;
436 }
437 if (zero_ray) { /* put bogus record? */
438 if ((yres <= 0) | (xres <= 1) &&
439 (lastray+1) % accumulate == 0) {
440 while (next_child_nq(1) >= 0)
441 ; /* clear the queue */
442 lastdone = lastray = accumulate-1;
443 waitflush = 1; /* flush next */
444 }
445 put_zero_record(++lastray);
446 }
447 if (raysleft && !--raysleft)
448 break; /* preemptive EOI */
449 }
450 while (next_child_nq(1) >= 0) /* empty results queue */
451 ;
452 if (account < accumulate) {
453 error(WARNING, "partial accumulation in final record");
454 free_binq(out_bq); /* XXX just ignore it */
455 out_bq = NULL;
456 }
457 free_binq(NULL); /* clean up */
458 lu_done(&ofiletab);
459 if (raysleft)
460 error(USER, "unexpected EOF on input");
461 }
462
463
464 /* Wait for the next available child by monitoring "to" pipes */
465 static int
466 next_child_ready()
467 {
468 fd_set writeset, errset;
469 int i, n;
470
471 for (i = nchild; i--; ) /* see if there's one free first */
472 if (!kida[i].nr)
473 return(i);
474 /* prepare select() call */
475 FD_ZERO(&writeset); FD_ZERO(&errset);
476 n = 0;
477 for (i = nchild; i--; ) {
478 FD_SET(kida[i].pr.w, &writeset);
479 FD_SET(kida[i].pr.r, &errset);
480 if (kida[i].pr.w >= n)
481 n = kida[i].pr.w + 1;
482 if (kida[i].pr.r >= n)
483 n = kida[i].pr.r + 1;
484 }
485 errno = 0;
486 n = select(n, NULL, &writeset, &errset, NULL);
487 if (n < 0)
488 error(SYSTEM, "select() error in next_child_ready()");
489 n = -1; /* identify waiting child */
490 for (i = nchild; i--; ) {
491 if (FD_ISSET(kida[i].pr.r, &errset))
492 error(USER, "rendering process died");
493 if (FD_ISSET(kida[i].pr.w, &writeset))
494 kida[n = i].nr = 0;
495 }
496 return(n); /* first available child */
497 }
498
499
500 /* Modified parental loop for full accumulation mode (-c 0) */
501 void
502 feeder_loop()
503 {
504 static int ignore_warning_given = 0;
505 int ninq = 0;
506 FVECT orgdir[2*MAXIQ];
507 int i, n;
508 /* load rays from stdin & process */
509 #ifdef getc_unlocked
510 flockfile(stdin); /* avoid lock/unlock overhead */
511 #endif
512 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
513 if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
514 (orgdir[2*ninq+1][1] == 0.0) &
515 (orgdir[2*ninq+1][2] == 0.0)) {
516 if (!ignore_warning_given++)
517 error(WARNING,
518 "dummy ray(s) ignored during accumulation\n");
519 continue;
520 }
521 if (++ninq >= MAXIQ) {
522 i = next_child_ready(); /* get eager child */
523 n = sizeof(FVECT)*2 * ninq; /* give assignment */
524 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
525 error(SYSTEM, "pipe write error");
526 kida[i].r1 = lastray+1;
527 lastray += kida[i].nr = ninq;
528 if (lastray < lastdone) /* RNUMBER wrapped? */
529 lastdone = lastray = 0;
530 ninq = 0;
531 }
532 if (raysleft && !--raysleft)
533 break; /* preemptive EOI */
534 }
535 if (ninq) { /* polish off input */
536 i = next_child_ready();
537 n = sizeof(FVECT)*2 * ninq;
538 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
539 error(SYSTEM, "pipe write error");
540 kida[i].r1 = lastray+1;
541 lastray += kida[i].nr = ninq;
542 ninq = 0;
543 }
544 memset(orgdir, 0, sizeof(FVECT)*2); /* get results */
545 for (i = nchild; i--; ) {
546 writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2);
547 queue_results(i);
548 }
549 if (recover) /* and from before? */
550 queue_modifiers();
551 end_children(0); /* free up file descriptors */
552 for (i = 0; i < nmods; i++)
553 mod_output(out_bq->mca[i]); /* output accumulated record */
554 end_record();
555 free_binq(out_bq); /* clean up */
556 out_bq = NULL;
557 free_binq(NULL);
558 lu_done(&ofiletab);
559 if (raysleft)
560 error(USER, "unexpected EOF on input");
561 }