ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.8
Committed: Tue Jun 12 22:46:45 2012 UTC (11 years, 10 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.7: +4 -4 lines
Log Message:
Fix for -ld+ option

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.8 static const char RCSid[] = "$Id: rc3.c,v 2.7 2012/06/12 22:40:30 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * Accumulate ray contributions for a set of materials
6     * Controlling process for multiple children
7     */
8    
9     #include "rcontrib.h"
10     #include "platform.h"
11     #include "rtprocess.h"
12     #include "selcall.h"
13    
14     /* Modifier contribution queue (results waiting to be output) */
15     typedef struct s_binq {
16 greg 2.6 RNUMBER ndx; /* index for this entry */
17     RNUMBER nadded; /* accumulated so far */
18 greg 2.1 struct s_binq *next; /* next in queue */
19     MODCONT *mca[1]; /* contrib. array (extends struct) */
20     } BINQ;
21    
22     static BINQ *out_bq = NULL; /* output bin queue */
23     static BINQ *free_bq = NULL; /* free queue entries */
24    
25 greg 2.6 static struct {
26     RNUMBER r1; /* assigned ray starting index */
27     SUBPROC pr; /* PID, i/o descriptors */
28     FILE *infp; /* file pointer to read from process */
29     int nr; /* number rays to sum (0 if free) */
30     } kida[MAXPROCESS]; /* our child processes */
31 greg 2.1
32    
33 greg 2.4 /* Get new bin queue entry */
34 greg 2.1 static BINQ *
35     new_binq()
36     {
37 greg 2.4 BINQ *bp;
38 greg 2.1 int i;
39    
40 greg 2.4 if (free_bq != NULL) { /* something already available? */
41     bp = free_bq;
42 greg 2.1 free_bq = bp->next;
43     bp->next = NULL;
44 greg 2.6 bp->nadded = 0;
45 greg 2.1 return(bp);
46     }
47     /* else allocate fresh */
48 greg 2.4 bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
49 greg 2.1 if (bp == NULL)
50     goto memerr;
51     for (i = nmods; i--; ) {
52     MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
53     bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
54     sizeof(DCOLOR)*(mp->nbins-1));
55     if (bp->mca[i] == NULL)
56     goto memerr;
57     memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
58     /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
59     }
60     bp->ndx = 0;
61 greg 2.6 bp->nadded = 0;
62 greg 2.1 bp->next = NULL;
63     return(bp);
64     memerr:
65     error(SYSTEM, "out of memory in new_binq()");
66     return(NULL);
67     }
68    
69    
70     /* Free a bin queue entry */
71     static void
72     free_binq(BINQ *bp)
73     {
74     int i;
75    
76     if (bp == NULL) { /* signal to release our free list */
77     while ((bp = free_bq) != NULL) {
78     free_bq = bp->next;
79     for (i = nmods; i--; )
80     free(bp->mca[i]);
81     /* Note: we don't own bp->mca[i]->binv */
82     free(bp);
83     }
84     return;
85     }
86     /* clear sums for next use */
87     /* for (i = nmods; i--; )
88     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
89     */
90     bp->ndx = 0;
91     bp->next = free_bq; /* push onto free list */
92     free_bq = bp;
93     }
94    
95    
96     /* Add modifier values to accumulation record in queue and clear */
97     void
98     queue_modifiers()
99     {
100     MODCONT *mpin, *mpout;
101     int i, j;
102    
103     if ((accumulate > 0) | (out_bq == NULL))
104     error(CONSISTENCY, "bad call to queue_modifiers()");
105    
106     for (i = nmods; i--; ) {
107     mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
108     mpout = out_bq->mca[i];
109     for (j = mpout->nbins; j--; )
110     addcolor(mpout->cbin[j], mpin->cbin[j]);
111     memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
112     }
113 greg 2.5 out_bq->nadded++;
114 greg 2.1 }
115    
116    
117 greg 2.4 /* Sum one modifier record into another (updates nadded) */
118 greg 2.1 static void
119     add_modbin(BINQ *dst, BINQ *src)
120     {
121     int i, j;
122    
123     for (i = nmods; i--; ) {
124     MODCONT *mpin = src->mca[i];
125     MODCONT *mpout = dst->mca[i];
126     for (j = mpout->nbins; j--; )
127     addcolor(mpout->cbin[j], mpin->cbin[j]);
128     }
129 greg 2.4 dst->nadded += src->nadded;
130 greg 2.1 }
131    
132    
133 greg 2.2 /* Queue values for later output */
134     static void
135 greg 2.1 queue_output(BINQ *bp)
136     {
137     BINQ *b_last, *b_cur;
138    
139     if (accumulate <= 0) { /* just accumulating? */
140     if (out_bq == NULL) {
141     bp->next = NULL;
142     out_bq = bp;
143     } else {
144     add_modbin(out_bq, bp);
145     free_binq(bp);
146     }
147 greg 2.2 return;
148 greg 2.1 }
149     b_last = NULL; /* else insert in output queue */
150     for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
151     b_cur = b_cur->next)
152     b_last = b_cur;
153 greg 2.4
154 greg 2.1 if (b_last != NULL) {
155     bp->next = b_cur;
156     b_last->next = bp;
157     } else {
158     bp->next = out_bq;
159     out_bq = bp;
160     }
161 greg 2.3 if (accumulate == 1) /* no accumulation? */
162 greg 2.2 return;
163     b_cur = out_bq; /* else merge accumulation entries */
164     while (b_cur->next != NULL) {
165 greg 2.4 if (b_cur->nadded >= accumulate ||
166 greg 2.2 (b_cur->ndx-1)/accumulate !=
167     (b_cur->next->ndx-1)/accumulate) {
168     b_cur = b_cur->next;
169     continue;
170 greg 2.1 }
171 greg 2.2 add_modbin(b_cur, b_cur->next);
172     b_last = b_cur->next;
173     b_cur->next = b_last->next;
174     b_last->next = NULL;
175     free_binq(b_last);
176 greg 2.1 }
177 greg 2.2 }
178    
179    
180 greg 2.4 /* Count number of records ready for output */
181     static int
182     queue_ready()
183     {
184     int nready = 0;
185     BINQ *bp;
186    
187     if (accumulate <= 0) /* just accumulating? */
188     return(0);
189    
190     for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
191     bp->ndx == lastdone+nready*accumulate+1;
192     bp = bp->next)
193     ++nready;
194    
195     return(nready);
196     }
197    
198    
199     /* Catch up with output queue by producing ready results */
200 greg 2.2 static int
201 greg 2.4 output_catchup(int nmax)
202 greg 2.2 {
203     int nout = 0;
204     BINQ *bp;
205     int i;
206    
207     if (accumulate <= 0) /* just accumulating? */
208     return(0);
209     /* else output ready results */
210 greg 2.4 while (out_bq != NULL && out_bq->nadded >= accumulate
211     && out_bq->ndx == lastdone+1) {
212     if ((nmax > 0) & (nout >= nmax))
213     break;
214 greg 2.2 bp = out_bq; /* pop off first entry */
215     out_bq = bp->next;
216     bp->next = NULL;
217 greg 2.1 for (i = 0; i < nmods; i++) /* output record */
218 greg 2.2 mod_output(bp->mca[i]);
219 greg 2.1 end_record();
220 greg 2.2 free_binq(bp); /* free this entry */
221 greg 2.1 lastdone += accumulate;
222     ++nout;
223     }
224     return(nout);
225     }
226    
227    
228 greg 2.3 /* Put a zero record in results queue & output */
229 greg 2.1 void
230 greg 2.3 put_zero_record(int ndx)
231 greg 2.1 {
232     BINQ *bp = new_binq();
233     int i;
234    
235     for (i = nmods; i--; )
236     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
237     bp->ndx = ndx;
238 greg 2.6 bp->nadded = 1;
239 greg 2.1 queue_output(bp);
240 greg 2.4 output_catchup(0);
241 greg 2.1 }
242    
243    
244 greg 2.6 /* Get results from child process and add to queue */
245     static void
246     queue_results(int k)
247     {
248     BINQ *bq = new_binq(); /* get results holder */
249     int j;
250    
251     bq->ndx = kida[k].r1;
252     bq->nadded = kida[k].nr;
253     /* read from child */
254     for (j = 0; j < nmods; j++)
255     if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
256     kida[k].infp) != bq->mca[j]->nbins)
257     error(SYSTEM, "read error from render process");
258    
259     queue_output(bq); /* put results in output queue */
260     kida[k].nr = 0; /* mark child as available */
261     }
262    
263    
264 greg 2.1 /* callback to set output spec to NULL (stdout) */
265     static int
266     set_stdout(const LUENT *le, void *p)
267     {
268     (*(MODCONT *)le->data).outspec = NULL;
269     return(0);
270     }
271    
272    
273     /* Start child processes if we can */
274     int
275     in_rchild()
276     {
277     #ifdef _WIN32
278     error(WARNING, "multiprocessing unsupported -- running solo");
279     nproc = 1;
280     return(1);
281     #else
282     /* try to fork ourselves */
283     while (nchild < nproc) {
284     int p0[2], p1[2];
285     int pid;
286     /* prepare i/o pipes */
287 greg 2.3 errno = 0;
288 greg 2.1 if (pipe(p0) < 0 || pipe(p1) < 0)
289     error(SYSTEM, "pipe() call failed!");
290     pid = fork(); /* fork parent process */
291     if (pid == 0) { /* if in child, set up & return true */
292     close(p0[1]); close(p1[0]);
293 greg 2.4 lu_doall(&modconttab, set_stdout, NULL);
294     lu_done(&ofiletab);
295 greg 2.5 while (nchild--) { /* don't share other pipes */
296 greg 2.6 close(kida[nchild].pr.w);
297     fclose(kida[nchild].infp);
298 greg 2.4 }
299 greg 2.1 dup2(p0[0], 0); close(p0[0]);
300     dup2(p1[1], 1); close(p1[1]);
301     inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
302     outfmt = 'd';
303     header = 0;
304     yres = 0;
305     raysleft = 0;
306 greg 2.6 if (accumulate == 1) {
307     waitflush = xres = 1;
308     account = accumulate = 1;
309     } else { /* parent controls accumulation */
310     waitflush = xres = 0;
311     account = accumulate = 0;
312     }
313 greg 2.1 return(1); /* child return value */
314     }
315     if (pid < 0)
316     error(SYSTEM, "fork() call failed!");
317 greg 2.3 /* connect parent's pipes */
318 greg 2.1 close(p0[0]); close(p1[1]);
319 greg 2.6 kida[nchild].pr.r = p1[0];
320     kida[nchild].pr.w = p0[1];
321     kida[nchild].pr.pid = pid;
322     kida[nchild].pr.running = 1;
323     kida[nchild].infp = fdopen(p1[0], "rb");
324     if (kida[nchild].infp == NULL)
325 greg 2.2 error(SYSTEM, "out of memory in in_rchild()");
326 greg 2.3 #ifdef getc_unlocked
327 greg 2.6 flockfile(kida[nchild].infp); /* avoid mutex overhead */
328 greg 2.3 #endif
329 greg 2.6 kida[nchild++].nr = 0; /* mark as available */
330 greg 2.1 }
331     return(0); /* parent return value */
332     #endif
333     }
334    
335    
336     /* Close child processes */
337     void
338     end_children()
339     {
340     int status;
341    
342 greg 2.5 while (nchild > 0) {
343     nchild--;
344 greg 2.6 fclose(kida[nchild].infp);
345     kida[nchild].pr.r = -1; /* close(-1) error is ignored */
346     if ((status = close_process(&kida[nchild].pr)) > 0) {
347 greg 2.1 sprintf(errmsg,
348     "rendering process returned bad status (%d)",
349     status);
350     error(WARNING, errmsg);
351     }
352 greg 2.2 }
353 greg 2.1 }
354    
355    
356 greg 2.5 /* Wait for the next available child, managing output queue simultaneously */
357 greg 2.1 static int
358 greg 2.5 next_child_nq(int flushing)
359 greg 2.1 {
360 greg 2.2 static struct timeval polling;
361 greg 2.4 struct timeval *pmode;
362 greg 2.2 fd_set readset, errset;
363 greg 2.6 int i, n, nr, nqr;
364 greg 2.1
365 greg 2.5 if (!flushing) /* see if there's one free */
366 greg 2.1 for (i = nchild; i--; )
367 greg 2.6 if (!kida[i].nr)
368 greg 2.1 return(i);
369 greg 2.4
370 greg 2.5 nqr = queue_ready(); /* choose blocking mode or polling */
371     if ((nqr > 0) & !flushing)
372     pmode = &polling;
373     else
374 greg 2.4 pmode = NULL;
375 greg 2.5 tryagain: /* catch up with output? */
376     if (pmode == &polling) {
377     if (nqr > nchild) /* don't get too far behind */
378     nqr -= output_catchup(nqr-nchild);
379     } else if (nqr > 0) /* clear output before blocking */
380     nqr -= output_catchup(0);
381 greg 2.1 /* prepare select() call */
382     FD_ZERO(&readset); FD_ZERO(&errset);
383     n = nr = 0;
384     for (i = nchild; i--; ) {
385 greg 2.6 if (kida[i].nr) {
386     FD_SET(kida[i].pr.r, &readset);
387 greg 2.1 ++nr;
388     }
389 greg 2.6 FD_SET(kida[i].pr.r, &errset);
390     if (kida[i].pr.r >= n)
391     n = kida[i].pr.r + 1;
392 greg 2.1 }
393 greg 2.3 if (!nr) /* nothing to wait for? */
394     return(-1);
395 greg 2.2 if ((nr > 1) | (pmode == &polling)) {
396 greg 2.1 errno = 0;
397 greg 2.5 nr = select(n, &readset, NULL, &errset, pmode);
398     if (!nr) {
399 greg 2.2 pmode = NULL; /* try again, blocking this time */
400     goto tryagain;
401     }
402 greg 2.5 if (nr < 0)
403 greg 2.3 error(SYSTEM, "select() error in next_child_nq()");
404 greg 2.1 } else
405     FD_ZERO(&errset);
406     n = -1; /* read results from child(ren) */
407     for (i = nchild; i--; ) {
408 greg 2.6 if (FD_ISSET(kida[i].pr.r, &errset))
409 greg 2.1 error(USER, "rendering process died");
410 greg 2.6 if (FD_ISSET(kida[i].pr.r, &readset))
411     queue_results(n = i);
412 greg 2.1 }
413 greg 2.3 return(n); /* first available child */
414 greg 2.1 }
415    
416    
417     /* Run parental oversight loop */
418     void
419     parental_loop()
420     {
421 greg 2.6 #define MAXIQ (int)(PIPE_BUF/(sizeof(FVECT)*2))
422 greg 2.1 static int ignore_warning_given = 0;
423 greg 2.6 int qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
424     int ninq = 0;
425     FVECT orgdir[2*MAXIQ];
426     int i, n;
427 greg 2.1 /* load rays from stdin & process */
428     #ifdef getc_unlocked
429     flockfile(stdin); /* avoid lock/unlock overhead */
430     #endif
431 greg 2.6 while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
432 greg 2.8 if (orgdir[2*ninq+1][0] == 0.0 && /* asking for flush? */
433     (orgdir[2*ninq+1][1] == 0.0) &
434     (orgdir[2*ninq+1][2] == 0.0)) {
435 greg 2.6 if (accumulate != 1) {
436     if (!ignore_warning_given++)
437     error(WARNING,
438 greg 2.1 "dummy ray(s) ignored during accumulation\n");
439 greg 2.6 continue;
440     }
441 greg 2.1 while (next_child_nq(1) >= 0)
442 greg 2.3 ; /* clear the queue */
443 greg 2.1 lastdone = lastray = 0;
444     if ((yres <= 0) | (xres <= 0))
445 greg 2.5 waitflush = 1; /* flush next */
446 greg 2.3 put_zero_record(++lastray);
447 greg 2.6 } else if (++ninq >= qlimit || accumulate > 1 &&
448     lastray/accumulate != (lastray+ninq)/accumulate) {
449 greg 2.5 i = next_child_nq(0); /* manages output */
450 greg 2.6 n = ninq;
451     if (accumulate != 1) /* request flush? */
452     memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
453     n *= sizeof(FVECT)*2; /* send assignment */
454     if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
455 greg 2.1 error(SYSTEM, "pipe write error");
456 greg 2.6 kida[i].r1 = lastray+1;
457     lastray += kida[i].nr = ninq; /* mark as busy */
458     ninq = 0;
459     if (lastray < lastdone) { /* RNUMBER wrapped? */
460     while (next_child_nq(1) >= 0)
461     ;
462     lastdone = lastray = 0;
463     }
464 greg 2.1 }
465     if (raysleft && !--raysleft)
466     break; /* preemptive EOI */
467     }
468     while (next_child_nq(1) >= 0) /* empty results queue */
469     ;
470     /* output accumulated record */
471     if (accumulate <= 0 || account < accumulate) {
472 greg 2.5 end_children(); /* frees up file descriptors */
473 greg 2.1 if (account < accumulate) {
474     error(WARNING, "partial accumulation in final record");
475     accumulate -= account;
476     }
477     for (i = 0; i < nmods; i++)
478     mod_output(out_bq->mca[i]);
479     end_record();
480 greg 2.3 free_binq(out_bq);
481     out_bq = NULL;
482 greg 2.1 }
483     if (raysleft)
484     error(USER, "unexpected EOF on input");
485     free_binq(NULL); /* clean up */
486     lu_done(&ofiletab);
487 greg 2.6 #undef MAXIQ
488 greg 2.1 }