ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.3
Committed: Sun Jun 10 05:25:42 2012 UTC (11 years, 10 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.2: +33 -26 lines
Log Message:
Tweaks and bug fixes to new code

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.3 static const char RCSid[] = "$Id: rc3.c,v 2.2 2012/06/09 16:47:27 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * Accumulate ray contributions for a set of materials
6     * Controlling process for multiple children
7     */
8    
9     #include "rcontrib.h"
10     #include "platform.h"
11     #include "rtprocess.h"
12     #include "selcall.h"
13    
14     /* Modifier contribution queue (results waiting to be output) */
15     typedef struct s_binq {
16     int ndx; /* index for this entry */
17     int n2add; /* number left to add */
18     struct s_binq *next; /* next in queue */
19     MODCONT *mca[1]; /* contrib. array (extends struct) */
20     } BINQ;
21    
22     static BINQ *out_bq = NULL; /* output bin queue */
23     static BINQ *free_bq = NULL; /* free queue entries */
24    
25     static SUBPROC kida[MAXPROCESS]; /* child processes */
26 greg 2.2 static FILE *inq_fp[MAXPROCESS]; /* input streams */
27 greg 2.1
28    
29     /* Get new (empty) bin queue entry */
30     static BINQ *
31     new_binq()
32     {
33     BINQ *bp = free_bq;
34     int i;
35    
36     if (bp != NULL) { /* something already available? */
37     free_bq = bp->next;
38     bp->next = NULL;
39     bp->n2add = accumulate-1;
40     return(bp);
41     }
42     /* else allocate fresh */
43     bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *));
44     if (bp == NULL)
45     goto memerr;
46     for (i = nmods; i--; ) {
47     MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
48     bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
49     sizeof(DCOLOR)*(mp->nbins-1));
50     if (bp->mca[i] == NULL)
51     goto memerr;
52     memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
53     /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
54     }
55     bp->ndx = 0;
56     bp->n2add = accumulate-1;
57     bp->next = NULL;
58     return(bp);
59     memerr:
60     error(SYSTEM, "out of memory in new_binq()");
61     return(NULL);
62     }
63    
64    
65     /* Free a bin queue entry */
66     static void
67     free_binq(BINQ *bp)
68     {
69     int i;
70    
71     if (bp == NULL) { /* signal to release our free list */
72     while ((bp = free_bq) != NULL) {
73     free_bq = bp->next;
74     for (i = nmods; i--; )
75     free(bp->mca[i]);
76     /* Note: we don't own bp->mca[i]->binv */
77     free(bp);
78     }
79     return;
80     }
81     /* clear sums for next use */
82     /* for (i = nmods; i--; )
83     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
84     */
85     bp->ndx = 0;
86     bp->next = free_bq; /* push onto free list */
87     free_bq = bp;
88     }
89    
90    
91     /* Add modifier values to accumulation record in queue and clear */
92     void
93     queue_modifiers()
94     {
95     MODCONT *mpin, *mpout;
96     int i, j;
97    
98     if ((accumulate > 0) | (out_bq == NULL))
99     error(CONSISTENCY, "bad call to queue_modifiers()");
100    
101     for (i = nmods; i--; ) {
102     mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
103     mpout = out_bq->mca[i];
104     for (j = mpout->nbins; j--; )
105     addcolor(mpout->cbin[j], mpin->cbin[j]);
106     memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
107     }
108     }
109    
110    
111     /* Sum one modifier record into another (doesn't update n2add) */
112     static void
113     add_modbin(BINQ *dst, BINQ *src)
114     {
115     int i, j;
116    
117     for (i = nmods; i--; ) {
118     MODCONT *mpin = src->mca[i];
119     MODCONT *mpout = dst->mca[i];
120     for (j = mpout->nbins; j--; )
121     addcolor(mpout->cbin[j], mpin->cbin[j]);
122     }
123     }
124    
125    
126 greg 2.2 /* Queue values for later output */
127     static void
128 greg 2.1 queue_output(BINQ *bp)
129     {
130     BINQ *b_last, *b_cur;
131    
132     if (accumulate <= 0) { /* just accumulating? */
133     if (out_bq == NULL) {
134     bp->next = NULL;
135     out_bq = bp;
136     } else {
137     add_modbin(out_bq, bp);
138     free_binq(bp);
139     }
140 greg 2.2 return;
141 greg 2.1 }
142     b_last = NULL; /* else insert in output queue */
143     for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
144     b_cur = b_cur->next)
145     b_last = b_cur;
146     if (b_last != NULL) {
147     bp->next = b_cur;
148     b_last->next = bp;
149     } else {
150     bp->next = out_bq;
151     out_bq = bp;
152     }
153 greg 2.3 if (accumulate == 1) /* no accumulation? */
154 greg 2.2 return;
155     b_cur = out_bq; /* else merge accumulation entries */
156     while (b_cur->next != NULL) {
157     if (b_cur->n2add <= 0 ||
158     (b_cur->ndx-1)/accumulate !=
159     (b_cur->next->ndx-1)/accumulate) {
160     b_cur = b_cur->next;
161     continue;
162 greg 2.1 }
163 greg 2.2 add_modbin(b_cur, b_cur->next);
164     b_cur->n2add--;
165     b_last = b_cur->next;
166     b_cur->next = b_last->next;
167     b_last->next = NULL;
168     free_binq(b_last);
169 greg 2.1 }
170 greg 2.2 }
171    
172    
173 greg 2.3 /* Get current with output queue by producing ready results */
174 greg 2.2 static int
175     output_catchup()
176     {
177     int nout = 0;
178     BINQ *bp;
179     int i;
180    
181     if (accumulate <= 0) /* just accumulating? */
182     return(0);
183     /* else output ready results */
184 greg 2.1 while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) {
185 greg 2.2 bp = out_bq; /* pop off first entry */
186     out_bq = bp->next;
187     bp->next = NULL;
188 greg 2.1 for (i = 0; i < nmods; i++) /* output record */
189 greg 2.2 mod_output(bp->mca[i]);
190 greg 2.1 end_record();
191 greg 2.2 free_binq(bp); /* free this entry */
192 greg 2.1 lastdone += accumulate;
193     ++nout;
194     }
195     return(nout);
196     }
197    
198    
199 greg 2.3 /* Put a zero record in results queue & output */
200 greg 2.1 void
201 greg 2.3 put_zero_record(int ndx)
202 greg 2.1 {
203     BINQ *bp = new_binq();
204     int i;
205    
206     for (i = nmods; i--; )
207     memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
208     bp->ndx = ndx;
209     queue_output(bp);
210 greg 2.3 output_catchup();
211 greg 2.1 }
212    
213    
214     /* callback to set output spec to NULL (stdout) */
215     static int
216     set_stdout(const LUENT *le, void *p)
217     {
218     (*(MODCONT *)le->data).outspec = NULL;
219     return(0);
220     }
221    
222    
223     /* Start child processes if we can */
224     int
225     in_rchild()
226     {
227     #ifdef _WIN32
228     error(WARNING, "multiprocessing unsupported -- running solo");
229     nproc = 1;
230     return(1);
231     #else
232     /* try to fork ourselves */
233     while (nchild < nproc) {
234     int p0[2], p1[2];
235     int pid;
236     /* prepare i/o pipes */
237 greg 2.3 errno = 0;
238 greg 2.1 if (pipe(p0) < 0 || pipe(p1) < 0)
239     error(SYSTEM, "pipe() call failed!");
240     pid = fork(); /* fork parent process */
241     if (pid == 0) { /* if in child, set up & return true */
242     close(p0[1]); close(p1[0]);
243     dup2(p0[0], 0); close(p0[0]);
244     dup2(p1[1], 1); close(p1[1]);
245     inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
246     outfmt = 'd';
247     header = 0;
248     waitflush = xres = 1;
249     yres = 0;
250     raysleft = 0;
251     account = accumulate = 1;
252     lu_doall(&modconttab, set_stdout, NULL);
253     nchild = -1;
254     return(1); /* child return value */
255     }
256     if (pid < 0)
257     error(SYSTEM, "fork() call failed!");
258 greg 2.3 /* connect parent's pipes */
259 greg 2.1 close(p0[0]); close(p1[1]);
260     kida[nchild].r = p1[0];
261     kida[nchild].w = p0[1];
262     kida[nchild].pid = pid;
263     kida[nchild].running = -1;
264 greg 2.2 inq_fp[nchild] = fdopen(p1[0], "rb");
265     if (inq_fp[nchild] == NULL)
266     error(SYSTEM, "out of memory in in_rchild()");
267 greg 2.3 #ifdef getc_unlocked
268     flockfile(inq_fp[nchild]); /* avoid mutex overhead */
269     #endif
270 greg 2.1 ++nchild;
271     }
272     return(0); /* parent return value */
273     #endif
274     }
275    
276    
277     /* Close child processes */
278     void
279     end_children()
280     {
281     int status;
282    
283 greg 2.2 while (nchild-- > 0) {
284     kida[nchild].r = -1; /* close(-1) error is ignored */
285 greg 2.1 if ((status = close_process(&kida[nchild])) > 0) {
286     sprintf(errmsg,
287     "rendering process returned bad status (%d)",
288     status);
289     error(WARNING, errmsg);
290     }
291 greg 2.2 fclose(inq_fp[nchild]); /* performs actual close() */
292     }
293 greg 2.1 }
294    
295    
296 greg 2.2 /* Wait for the next available child, managing output queue as well */
297 greg 2.1 static int
298     next_child_nq(int force_wait)
299     {
300 greg 2.2 static struct timeval polling;
301     struct timeval *pmode = force_wait | (accumulate <= 0) ?
302     (struct timeval *)NULL : &polling;
303     fd_set readset, errset;
304     int i, j, n, nr;
305 greg 2.1
306     if (!force_wait) /* see if there's one free */
307     for (i = nchild; i--; )
308     if (kida[i].running < 0)
309     return(i);
310     /* prepare select() call */
311     FD_ZERO(&readset); FD_ZERO(&errset);
312     n = nr = 0;
313     for (i = nchild; i--; ) {
314     if (kida[i].running > 0) {
315     FD_SET(kida[i].r, &readset);
316     ++nr;
317     }
318     FD_SET(kida[i].r, &errset);
319     if (kida[i].r >= n)
320     n = kida[i].r + 1;
321     }
322 greg 2.2 tryagain:
323 greg 2.3 if (pmode == NULL) /* catch up in case we block */
324 greg 2.2 output_catchup();
325 greg 2.3 if (!nr) /* nothing to wait for? */
326     return(-1);
327 greg 2.2 if ((nr > 1) | (pmode == &polling)) {
328 greg 2.1 errno = 0;
329 greg 2.3 i = select(n, &readset, NULL, &errset, pmode);
330     if (!i) {
331 greg 2.2 pmode = NULL; /* try again, blocking this time */
332     goto tryagain;
333     }
334 greg 2.3 if (i < 0)
335     error(SYSTEM, "select() error in next_child_nq()");
336 greg 2.1 } else
337     FD_ZERO(&errset);
338     n = -1; /* read results from child(ren) */
339     for (i = nchild; i--; ) {
340     BINQ *bq;
341     if (FD_ISSET(kida[i].r, &errset))
342     error(USER, "rendering process died");
343     if (!FD_ISSET(kida[i].r, &readset))
344     continue;
345     bq = new_binq(); /* get results holder */
346     bq->ndx = kida[i].running;
347     /* read from child */
348     for (j = 0; j < nmods; j++) {
349 greg 2.3 nr = bq->mca[j]->nbins;
350     if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), nr,
351     inq_fp[i]) != nr)
352 greg 2.1 error(SYSTEM, "read error from render process");
353     }
354 greg 2.3 queue_output(bq); /* add results to output queue */
355 greg 2.1 kida[i].running = -1; /* mark child as available */
356     n = i;
357     }
358 greg 2.3 return(n); /* first available child */
359 greg 2.1 }
360    
361    
362     /* Run parental oversight loop */
363     void
364     parental_loop()
365     {
366     static int ignore_warning_given = 0;
367     FVECT orgdir[2];
368     double d;
369 greg 2.3 int i;
370 greg 2.1 /* load rays from stdin & process */
371     #ifdef getc_unlocked
372     flockfile(stdin); /* avoid lock/unlock overhead */
373     #endif
374     while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) {
375     d = normalize(orgdir[1]);
376     /* asking for flush? */
377     if ((d == 0.0) & (accumulate != 1)) {
378     if (!ignore_warning_given++)
379     error(WARNING,
380     "dummy ray(s) ignored during accumulation\n");
381     continue;
382     }
383     if ((d == 0.0) | (lastray+1 < lastray)) {
384     while (next_child_nq(1) >= 0)
385 greg 2.3 ; /* clear the queue */
386 greg 2.1 lastdone = lastray = 0;
387     }
388     if (d == 0.0) {
389     if ((yres <= 0) | (xres <= 0))
390     waitflush = 1; /* flush right after */
391 greg 2.3 put_zero_record(++lastray);
392     } else { /* else assign ray */
393     i = next_child_nq(0);
394     if (writebuf(kida[i].w, (char *)orgdir,
395     sizeof(orgdir)) != sizeof(orgdir))
396 greg 2.1 error(SYSTEM, "pipe write error");
397 greg 2.3 kida[i].running = ++lastray;
398 greg 2.1 }
399     if (raysleft && !--raysleft)
400     break; /* preemptive EOI */
401     }
402     while (next_child_nq(1) >= 0) /* empty results queue */
403     ;
404     /* output accumulated record */
405     if (accumulate <= 0 || account < accumulate) {
406     if (account < accumulate) {
407     error(WARNING, "partial accumulation in final record");
408     accumulate -= account;
409     }
410     for (i = 0; i < nmods; i++)
411     mod_output(out_bq->mca[i]);
412     end_record();
413 greg 2.3 free_binq(out_bq);
414     out_bq = NULL;
415 greg 2.1 }
416     if (raysleft)
417     error(USER, "unexpected EOF on input");
418     free_binq(NULL); /* clean up */
419     lu_done(&ofiletab);
420     }