ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.7
Committed: Tue Jun 12 22:40:30 2012 UTC (11 years, 10 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.6: +2 -2 lines
Log Message:
Bug fix in vector normalization

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: rc3.c,v 2.6 2012/06/12 17:20:44 greg Exp $";
3 #endif
4 /*
5 * Accumulate ray contributions for a set of materials
6 * Controlling process for multiple children
7 */
8
9 #include "rcontrib.h"
10 #include "platform.h"
11 #include "rtprocess.h"
12 #include "selcall.h"
13
14 /* Modifier contribution queue (results waiting to be output) */
15 typedef struct s_binq {
16 RNUMBER ndx; /* index for this entry */
17 RNUMBER nadded; /* accumulated so far */
18 struct s_binq *next; /* next in queue */
19 MODCONT *mca[1]; /* contrib. array (extends struct) */
20 } BINQ;
21
22 static BINQ *out_bq = NULL; /* output bin queue */
23 static BINQ *free_bq = NULL; /* free queue entries */
24
25 static struct {
26 RNUMBER r1; /* assigned ray starting index */
27 SUBPROC pr; /* PID, i/o descriptors */
28 FILE *infp; /* file pointer to read from process */
29 int nr; /* number rays to sum (0 if free) */
30 } kida[MAXPROCESS]; /* our child processes */
31
32
33 /* Get new bin queue entry */
34 static BINQ *
35 new_binq()
36 {
37 BINQ *bp;
38 int i;
39
40 if (free_bq != NULL) { /* something already available? */
41 bp = free_bq;
42 free_bq = bp->next;
43 bp->next = NULL;
44 bp->nadded = 0;
45 return(bp);
46 }
47 /* else allocate fresh */
48 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
49 if (bp == NULL)
50 goto memerr;
51 for (i = nmods; i--; ) {
52 MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
53 bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
54 sizeof(DCOLOR)*(mp->nbins-1));
55 if (bp->mca[i] == NULL)
56 goto memerr;
57 memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
58 /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
59 }
60 bp->ndx = 0;
61 bp->nadded = 0;
62 bp->next = NULL;
63 return(bp);
64 memerr:
65 error(SYSTEM, "out of memory in new_binq()");
66 return(NULL);
67 }
68
69
70 /* Free a bin queue entry */
71 static void
72 free_binq(BINQ *bp)
73 {
74 int i;
75
76 if (bp == NULL) { /* signal to release our free list */
77 while ((bp = free_bq) != NULL) {
78 free_bq = bp->next;
79 for (i = nmods; i--; )
80 free(bp->mca[i]);
81 /* Note: we don't own bp->mca[i]->binv */
82 free(bp);
83 }
84 return;
85 }
86 /* clear sums for next use */
87 /* for (i = nmods; i--; )
88 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
89 */
90 bp->ndx = 0;
91 bp->next = free_bq; /* push onto free list */
92 free_bq = bp;
93 }
94
95
96 /* Add modifier values to accumulation record in queue and clear */
97 void
98 queue_modifiers()
99 {
100 MODCONT *mpin, *mpout;
101 int i, j;
102
103 if ((accumulate > 0) | (out_bq == NULL))
104 error(CONSISTENCY, "bad call to queue_modifiers()");
105
106 for (i = nmods; i--; ) {
107 mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
108 mpout = out_bq->mca[i];
109 for (j = mpout->nbins; j--; )
110 addcolor(mpout->cbin[j], mpin->cbin[j]);
111 memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
112 }
113 out_bq->nadded++;
114 }
115
116
117 /* Sum one modifier record into another (updates nadded) */
118 static void
119 add_modbin(BINQ *dst, BINQ *src)
120 {
121 int i, j;
122
123 for (i = nmods; i--; ) {
124 MODCONT *mpin = src->mca[i];
125 MODCONT *mpout = dst->mca[i];
126 for (j = mpout->nbins; j--; )
127 addcolor(mpout->cbin[j], mpin->cbin[j]);
128 }
129 dst->nadded += src->nadded;
130 }
131
132
133 /* Queue values for later output */
134 static void
135 queue_output(BINQ *bp)
136 {
137 BINQ *b_last, *b_cur;
138
139 if (accumulate <= 0) { /* just accumulating? */
140 if (out_bq == NULL) {
141 bp->next = NULL;
142 out_bq = bp;
143 } else {
144 add_modbin(out_bq, bp);
145 free_binq(bp);
146 }
147 return;
148 }
149 b_last = NULL; /* else insert in output queue */
150 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
151 b_cur = b_cur->next)
152 b_last = b_cur;
153
154 if (b_last != NULL) {
155 bp->next = b_cur;
156 b_last->next = bp;
157 } else {
158 bp->next = out_bq;
159 out_bq = bp;
160 }
161 if (accumulate == 1) /* no accumulation? */
162 return;
163 b_cur = out_bq; /* else merge accumulation entries */
164 while (b_cur->next != NULL) {
165 if (b_cur->nadded >= accumulate ||
166 (b_cur->ndx-1)/accumulate !=
167 (b_cur->next->ndx-1)/accumulate) {
168 b_cur = b_cur->next;
169 continue;
170 }
171 add_modbin(b_cur, b_cur->next);
172 b_last = b_cur->next;
173 b_cur->next = b_last->next;
174 b_last->next = NULL;
175 free_binq(b_last);
176 }
177 }
178
179
180 /* Count number of records ready for output */
181 static int
182 queue_ready()
183 {
184 int nready = 0;
185 BINQ *bp;
186
187 if (accumulate <= 0) /* just accumulating? */
188 return(0);
189
190 for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
191 bp->ndx == lastdone+nready*accumulate+1;
192 bp = bp->next)
193 ++nready;
194
195 return(nready);
196 }
197
198
199 /* Catch up with output queue by producing ready results */
200 static int
201 output_catchup(int nmax)
202 {
203 int nout = 0;
204 BINQ *bp;
205 int i;
206
207 if (accumulate <= 0) /* just accumulating? */
208 return(0);
209 /* else output ready results */
210 while (out_bq != NULL && out_bq->nadded >= accumulate
211 && out_bq->ndx == lastdone+1) {
212 if ((nmax > 0) & (nout >= nmax))
213 break;
214 bp = out_bq; /* pop off first entry */
215 out_bq = bp->next;
216 bp->next = NULL;
217 for (i = 0; i < nmods; i++) /* output record */
218 mod_output(bp->mca[i]);
219 end_record();
220 free_binq(bp); /* free this entry */
221 lastdone += accumulate;
222 ++nout;
223 }
224 return(nout);
225 }
226
227
228 /* Put a zero record in results queue & output */
229 void
230 put_zero_record(int ndx)
231 {
232 BINQ *bp = new_binq();
233 int i;
234
235 for (i = nmods; i--; )
236 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
237 bp->ndx = ndx;
238 bp->nadded = 1;
239 queue_output(bp);
240 output_catchup(0);
241 }
242
243
244 /* Get results from child process and add to queue */
245 static void
246 queue_results(int k)
247 {
248 BINQ *bq = new_binq(); /* get results holder */
249 int j;
250
251 bq->ndx = kida[k].r1;
252 bq->nadded = kida[k].nr;
253 /* read from child */
254 for (j = 0; j < nmods; j++)
255 if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
256 kida[k].infp) != bq->mca[j]->nbins)
257 error(SYSTEM, "read error from render process");
258
259 queue_output(bq); /* put results in output queue */
260 kida[k].nr = 0; /* mark child as available */
261 }
262
263
264 /* callback to set output spec to NULL (stdout) */
265 static int
266 set_stdout(const LUENT *le, void *p)
267 {
268 (*(MODCONT *)le->data).outspec = NULL;
269 return(0);
270 }
271
272
273 /* Start child processes if we can */
274 int
275 in_rchild()
276 {
277 #ifdef _WIN32
278 error(WARNING, "multiprocessing unsupported -- running solo");
279 nproc = 1;
280 return(1);
281 #else
282 /* try to fork ourselves */
283 while (nchild < nproc) {
284 int p0[2], p1[2];
285 int pid;
286 /* prepare i/o pipes */
287 errno = 0;
288 if (pipe(p0) < 0 || pipe(p1) < 0)
289 error(SYSTEM, "pipe() call failed!");
290 pid = fork(); /* fork parent process */
291 if (pid == 0) { /* if in child, set up & return true */
292 close(p0[1]); close(p1[0]);
293 lu_doall(&modconttab, set_stdout, NULL);
294 lu_done(&ofiletab);
295 while (nchild--) { /* don't share other pipes */
296 close(kida[nchild].pr.w);
297 fclose(kida[nchild].infp);
298 }
299 dup2(p0[0], 0); close(p0[0]);
300 dup2(p1[1], 1); close(p1[1]);
301 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
302 outfmt = 'd';
303 header = 0;
304 yres = 0;
305 raysleft = 0;
306 if (accumulate == 1) {
307 waitflush = xres = 1;
308 account = accumulate = 1;
309 } else { /* parent controls accumulation */
310 waitflush = xres = 0;
311 account = accumulate = 0;
312 }
313 return(1); /* child return value */
314 }
315 if (pid < 0)
316 error(SYSTEM, "fork() call failed!");
317 /* connect parent's pipes */
318 close(p0[0]); close(p1[1]);
319 kida[nchild].pr.r = p1[0];
320 kida[nchild].pr.w = p0[1];
321 kida[nchild].pr.pid = pid;
322 kida[nchild].pr.running = 1;
323 kida[nchild].infp = fdopen(p1[0], "rb");
324 if (kida[nchild].infp == NULL)
325 error(SYSTEM, "out of memory in in_rchild()");
326 #ifdef getc_unlocked
327 flockfile(kida[nchild].infp); /* avoid mutex overhead */
328 #endif
329 kida[nchild++].nr = 0; /* mark as available */
330 }
331 return(0); /* parent return value */
332 #endif
333 }
334
335
336 /* Close child processes */
337 void
338 end_children()
339 {
340 int status;
341
342 while (nchild > 0) {
343 nchild--;
344 fclose(kida[nchild].infp);
345 kida[nchild].pr.r = -1; /* close(-1) error is ignored */
346 if ((status = close_process(&kida[nchild].pr)) > 0) {
347 sprintf(errmsg,
348 "rendering process returned bad status (%d)",
349 status);
350 error(WARNING, errmsg);
351 }
352 }
353 }
354
355
356 /* Wait for the next available child, managing output queue simultaneously */
357 static int
358 next_child_nq(int flushing)
359 {
360 static struct timeval polling;
361 struct timeval *pmode;
362 fd_set readset, errset;
363 int i, n, nr, nqr;
364
365 if (!flushing) /* see if there's one free */
366 for (i = nchild; i--; )
367 if (!kida[i].nr)
368 return(i);
369
370 nqr = queue_ready(); /* choose blocking mode or polling */
371 if ((nqr > 0) & !flushing)
372 pmode = &polling;
373 else
374 pmode = NULL;
375 tryagain: /* catch up with output? */
376 if (pmode == &polling) {
377 if (nqr > nchild) /* don't get too far behind */
378 nqr -= output_catchup(nqr-nchild);
379 } else if (nqr > 0) /* clear output before blocking */
380 nqr -= output_catchup(0);
381 /* prepare select() call */
382 FD_ZERO(&readset); FD_ZERO(&errset);
383 n = nr = 0;
384 for (i = nchild; i--; ) {
385 if (kida[i].nr) {
386 FD_SET(kida[i].pr.r, &readset);
387 ++nr;
388 }
389 FD_SET(kida[i].pr.r, &errset);
390 if (kida[i].pr.r >= n)
391 n = kida[i].pr.r + 1;
392 }
393 if (!nr) /* nothing to wait for? */
394 return(-1);
395 if ((nr > 1) | (pmode == &polling)) {
396 errno = 0;
397 nr = select(n, &readset, NULL, &errset, pmode);
398 if (!nr) {
399 pmode = NULL; /* try again, blocking this time */
400 goto tryagain;
401 }
402 if (nr < 0)
403 error(SYSTEM, "select() error in next_child_nq()");
404 } else
405 FD_ZERO(&errset);
406 n = -1; /* read results from child(ren) */
407 for (i = nchild; i--; ) {
408 if (FD_ISSET(kida[i].pr.r, &errset))
409 error(USER, "rendering process died");
410 if (FD_ISSET(kida[i].pr.r, &readset))
411 queue_results(n = i);
412 }
413 return(n); /* first available child */
414 }
415
416
417 /* Run parental oversight loop */
418 void
419 parental_loop()
420 {
421 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
422 static int ignore_warning_given = 0;
423 int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
424 int ninq = 0;
425 FVECT orgdir[2*MAXIQ];
426 double d;
427 int i, n;
428 /* load rays from stdin & process */
429 #ifdef getc_unlocked
430 flockfile(stdin); /* avoid lock/unlock overhead */
431 #endif
432 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
433 d = normalize(orgdir[2*ninq+1]);
434 if (d == 0.0) { /* asking for flush? */
435 if (accumulate != 1) {
436 if (!ignore_warning_given++)
437 error(WARNING,
438 "dummy ray(s) ignored during accumulation\n");
439 continue;
440 }
441 while (next_child_nq(1) >= 0)
442 ; /* clear the queue */
443 lastdone = lastray = 0;
444 if ((yres <= 0) | (xres <= 0))
445 waitflush = 1; /* flush next */
446 put_zero_record(++lastray);
447 } else if (++ninq >= qlimit || accumulate > 1 &&
448 lastray/accumulate != (lastray+ninq)/accumulate) {
449 i = next_child_nq(0); /* manages output */
450 n = ninq;
451 if (accumulate != 1) /* request flush? */
452 memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
453 n *= sizeof(FVECT)*2; /* send assignment */
454 if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
455 error(SYSTEM, "pipe write error");
456 kida[i].r1 = lastray+1;
457 lastray += kida[i].nr = ninq; /* mark as busy */
458 ninq = 0;
459 if (lastray < lastdone) { /* RNUMBER wrapped? */
460 while (next_child_nq(1) >= 0)
461 ;
462 lastdone = lastray = 0;
463 }
464 }
465 if (raysleft && !--raysleft)
466 break; /* preemptive EOI */
467 }
468 while (next_child_nq(1) >= 0) /* empty results queue */
469 ;
470 /* output accumulated record */
471 if (accumulate <= 0 || account < accumulate) {
472 end_children(); /* frees up file descriptors */
473 if (account < accumulate) {
474 error(WARNING, "partial accumulation in final record");
475 accumulate -= account;
476 }
477 for (i = 0; i < nmods; i++)
478 mod_output(out_bq->mca[i]);
479 end_record();
480 free_binq(out_bq);
481 out_bq = NULL;
482 }
483 if (raysleft)
484 error(USER, "unexpected EOF on input");
485 free_binq(NULL); /* clean up */
486 lu_done(&ofiletab);
487 #undef MAXIQ
488 }