ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.3
Committed: Sun Jun 10 05:25:42 2012 UTC (11 years, 10 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.2: +33 -26 lines
Log Message:
Tweaks and bug fixes to new code

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: rc3.c,v 2.2 2012/06/09 16:47:27 greg Exp $";
3 #endif
4 /*
5 * Accumulate ray contributions for a set of materials
6 * Controlling process for multiple children
7 */
8
9 #include "rcontrib.h"
10 #include "platform.h"
11 #include "rtprocess.h"
12 #include "selcall.h"
13
14 /* Modifier contribution queue (results waiting to be output) */
15 typedef struct s_binq {
16 int ndx; /* index for this entry */
17 int n2add; /* number left to add */
18 struct s_binq *next; /* next in queue */
19 MODCONT *mca[1]; /* contrib. array (extends struct) */
20 } BINQ;
21
22 static BINQ *out_bq = NULL; /* output bin queue */
23 static BINQ *free_bq = NULL; /* free queue entries */
24
25 static SUBPROC kida[MAXPROCESS]; /* child processes */
26 static FILE *inq_fp[MAXPROCESS]; /* input streams */
27
28
29 /* Get new (empty) bin queue entry */
30 static BINQ *
31 new_binq()
32 {
33 BINQ *bp = free_bq;
34 int i;
35
36 if (bp != NULL) { /* something already available? */
37 free_bq = bp->next;
38 bp->next = NULL;
39 bp->n2add = accumulate-1;
40 return(bp);
41 }
42 /* else allocate fresh */
43 bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *));
44 if (bp == NULL)
45 goto memerr;
46 for (i = nmods; i--; ) {
47 MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
48 bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
49 sizeof(DCOLOR)*(mp->nbins-1));
50 if (bp->mca[i] == NULL)
51 goto memerr;
52 memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
53 /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
54 }
55 bp->ndx = 0;
56 bp->n2add = accumulate-1;
57 bp->next = NULL;
58 return(bp);
59 memerr:
60 error(SYSTEM, "out of memory in new_binq()");
61 return(NULL);
62 }
63
64
65 /* Free a bin queue entry */
66 static void
67 free_binq(BINQ *bp)
68 {
69 int i;
70
71 if (bp == NULL) { /* signal to release our free list */
72 while ((bp = free_bq) != NULL) {
73 free_bq = bp->next;
74 for (i = nmods; i--; )
75 free(bp->mca[i]);
76 /* Note: we don't own bp->mca[i]->binv */
77 free(bp);
78 }
79 return;
80 }
81 /* clear sums for next use */
82 /* for (i = nmods; i--; )
83 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
84 */
85 bp->ndx = 0;
86 bp->next = free_bq; /* push onto free list */
87 free_bq = bp;
88 }
89
90
91 /* Add modifier values to accumulation record in queue and clear */
92 void
93 queue_modifiers()
94 {
95 MODCONT *mpin, *mpout;
96 int i, j;
97
98 if ((accumulate > 0) | (out_bq == NULL))
99 error(CONSISTENCY, "bad call to queue_modifiers()");
100
101 for (i = nmods; i--; ) {
102 mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
103 mpout = out_bq->mca[i];
104 for (j = mpout->nbins; j--; )
105 addcolor(mpout->cbin[j], mpin->cbin[j]);
106 memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
107 }
108 }
109
110
111 /* Sum one modifier record into another (doesn't update n2add) */
112 static void
113 add_modbin(BINQ *dst, BINQ *src)
114 {
115 int i, j;
116
117 for (i = nmods; i--; ) {
118 MODCONT *mpin = src->mca[i];
119 MODCONT *mpout = dst->mca[i];
120 for (j = mpout->nbins; j--; )
121 addcolor(mpout->cbin[j], mpin->cbin[j]);
122 }
123 }
124
125
126 /* Queue values for later output */
127 static void
128 queue_output(BINQ *bp)
129 {
130 BINQ *b_last, *b_cur;
131
132 if (accumulate <= 0) { /* just accumulating? */
133 if (out_bq == NULL) {
134 bp->next = NULL;
135 out_bq = bp;
136 } else {
137 add_modbin(out_bq, bp);
138 free_binq(bp);
139 }
140 return;
141 }
142 b_last = NULL; /* else insert in output queue */
143 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
144 b_cur = b_cur->next)
145 b_last = b_cur;
146 if (b_last != NULL) {
147 bp->next = b_cur;
148 b_last->next = bp;
149 } else {
150 bp->next = out_bq;
151 out_bq = bp;
152 }
153 if (accumulate == 1) /* no accumulation? */
154 return;
155 b_cur = out_bq; /* else merge accumulation entries */
156 while (b_cur->next != NULL) {
157 if (b_cur->n2add <= 0 ||
158 (b_cur->ndx-1)/accumulate !=
159 (b_cur->next->ndx-1)/accumulate) {
160 b_cur = b_cur->next;
161 continue;
162 }
163 add_modbin(b_cur, b_cur->next);
164 b_cur->n2add--;
165 b_last = b_cur->next;
166 b_cur->next = b_last->next;
167 b_last->next = NULL;
168 free_binq(b_last);
169 }
170 }
171
172
173 /* Get current with output queue by producing ready results */
174 static int
175 output_catchup()
176 {
177 int nout = 0;
178 BINQ *bp;
179 int i;
180
181 if (accumulate <= 0) /* just accumulating? */
182 return(0);
183 /* else output ready results */
184 while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) {
185 bp = out_bq; /* pop off first entry */
186 out_bq = bp->next;
187 bp->next = NULL;
188 for (i = 0; i < nmods; i++) /* output record */
189 mod_output(bp->mca[i]);
190 end_record();
191 free_binq(bp); /* free this entry */
192 lastdone += accumulate;
193 ++nout;
194 }
195 return(nout);
196 }
197
198
199 /* Put a zero record in results queue & output */
200 void
201 put_zero_record(int ndx)
202 {
203 BINQ *bp = new_binq();
204 int i;
205
206 for (i = nmods; i--; )
207 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
208 bp->ndx = ndx;
209 queue_output(bp);
210 output_catchup();
211 }
212
213
214 /* callback to set output spec to NULL (stdout) */
215 static int
216 set_stdout(const LUENT *le, void *p)
217 {
218 (*(MODCONT *)le->data).outspec = NULL;
219 return(0);
220 }
221
222
223 /* Start child processes if we can */
224 int
225 in_rchild()
226 {
227 #ifdef _WIN32
228 error(WARNING, "multiprocessing unsupported -- running solo");
229 nproc = 1;
230 return(1);
231 #else
232 /* try to fork ourselves */
233 while (nchild < nproc) {
234 int p0[2], p1[2];
235 int pid;
236 /* prepare i/o pipes */
237 errno = 0;
238 if (pipe(p0) < 0 || pipe(p1) < 0)
239 error(SYSTEM, "pipe() call failed!");
240 pid = fork(); /* fork parent process */
241 if (pid == 0) { /* if in child, set up & return true */
242 close(p0[1]); close(p1[0]);
243 dup2(p0[0], 0); close(p0[0]);
244 dup2(p1[1], 1); close(p1[1]);
245 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
246 outfmt = 'd';
247 header = 0;
248 waitflush = xres = 1;
249 yres = 0;
250 raysleft = 0;
251 account = accumulate = 1;
252 lu_doall(&modconttab, set_stdout, NULL);
253 nchild = -1;
254 return(1); /* child return value */
255 }
256 if (pid < 0)
257 error(SYSTEM, "fork() call failed!");
258 /* connect parent's pipes */
259 close(p0[0]); close(p1[1]);
260 kida[nchild].r = p1[0];
261 kida[nchild].w = p0[1];
262 kida[nchild].pid = pid;
263 kida[nchild].running = -1;
264 inq_fp[nchild] = fdopen(p1[0], "rb");
265 if (inq_fp[nchild] == NULL)
266 error(SYSTEM, "out of memory in in_rchild()");
267 #ifdef getc_unlocked
268 flockfile(inq_fp[nchild]); /* avoid mutex overhead */
269 #endif
270 ++nchild;
271 }
272 return(0); /* parent return value */
273 #endif
274 }
275
276
277 /* Close child processes */
278 void
279 end_children()
280 {
281 int status;
282
283 while (nchild-- > 0) {
284 kida[nchild].r = -1; /* close(-1) error is ignored */
285 if ((status = close_process(&kida[nchild])) > 0) {
286 sprintf(errmsg,
287 "rendering process returned bad status (%d)",
288 status);
289 error(WARNING, errmsg);
290 }
291 fclose(inq_fp[nchild]); /* performs actual close() */
292 }
293 }
294
295
296 /* Wait for the next available child, managing output queue as well */
297 static int
298 next_child_nq(int force_wait)
299 {
300 static struct timeval polling;
301 struct timeval *pmode = force_wait | (accumulate <= 0) ?
302 (struct timeval *)NULL : &polling;
303 fd_set readset, errset;
304 int i, j, n, nr;
305
306 if (!force_wait) /* see if there's one free */
307 for (i = nchild; i--; )
308 if (kida[i].running < 0)
309 return(i);
310 /* prepare select() call */
311 FD_ZERO(&readset); FD_ZERO(&errset);
312 n = nr = 0;
313 for (i = nchild; i--; ) {
314 if (kida[i].running > 0) {
315 FD_SET(kida[i].r, &readset);
316 ++nr;
317 }
318 FD_SET(kida[i].r, &errset);
319 if (kida[i].r >= n)
320 n = kida[i].r + 1;
321 }
322 tryagain:
323 if (pmode == NULL) /* catch up in case we block */
324 output_catchup();
325 if (!nr) /* nothing to wait for? */
326 return(-1);
327 if ((nr > 1) | (pmode == &polling)) {
328 errno = 0;
329 i = select(n, &readset, NULL, &errset, pmode);
330 if (!i) {
331 pmode = NULL; /* try again, blocking this time */
332 goto tryagain;
333 }
334 if (i < 0)
335 error(SYSTEM, "select() error in next_child_nq()");
336 } else
337 FD_ZERO(&errset);
338 n = -1; /* read results from child(ren) */
339 for (i = nchild; i--; ) {
340 BINQ *bq;
341 if (FD_ISSET(kida[i].r, &errset))
342 error(USER, "rendering process died");
343 if (!FD_ISSET(kida[i].r, &readset))
344 continue;
345 bq = new_binq(); /* get results holder */
346 bq->ndx = kida[i].running;
347 /* read from child */
348 for (j = 0; j < nmods; j++) {
349 nr = bq->mca[j]->nbins;
350 if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), nr,
351 inq_fp[i]) != nr)
352 error(SYSTEM, "read error from render process");
353 }
354 queue_output(bq); /* add results to output queue */
355 kida[i].running = -1; /* mark child as available */
356 n = i;
357 }
358 return(n); /* first available child */
359 }
360
361
362 /* Run parental oversight loop */
363 void
364 parental_loop()
365 {
366 static int ignore_warning_given = 0;
367 FVECT orgdir[2];
368 double d;
369 int i;
370 /* load rays from stdin & process */
371 #ifdef getc_unlocked
372 flockfile(stdin); /* avoid lock/unlock overhead */
373 #endif
374 while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) {
375 d = normalize(orgdir[1]);
376 /* asking for flush? */
377 if ((d == 0.0) & (accumulate != 1)) {
378 if (!ignore_warning_given++)
379 error(WARNING,
380 "dummy ray(s) ignored during accumulation\n");
381 continue;
382 }
383 if ((d == 0.0) | (lastray+1 < lastray)) {
384 while (next_child_nq(1) >= 0)
385 ; /* clear the queue */
386 lastdone = lastray = 0;
387 }
388 if (d == 0.0) {
389 if ((yres <= 0) | (xres <= 0))
390 waitflush = 1; /* flush right after */
391 put_zero_record(++lastray);
392 } else { /* else assign ray */
393 i = next_child_nq(0);
394 if (writebuf(kida[i].w, (char *)orgdir,
395 sizeof(orgdir)) != sizeof(orgdir))
396 error(SYSTEM, "pipe write error");
397 kida[i].running = ++lastray;
398 }
399 if (raysleft && !--raysleft)
400 break; /* preemptive EOI */
401 }
402 while (next_child_nq(1) >= 0) /* empty results queue */
403 ;
404 /* output accumulated record */
405 if (accumulate <= 0 || account < accumulate) {
406 if (account < accumulate) {
407 error(WARNING, "partial accumulation in final record");
408 accumulate -= account;
409 }
410 for (i = 0; i < nmods; i++)
411 mod_output(out_bq->mca[i]);
412 end_record();
413 free_binq(out_bq);
414 out_bq = NULL;
415 }
416 if (raysleft)
417 error(USER, "unexpected EOF on input");
418 free_binq(NULL); /* clean up */
419 lu_done(&ofiletab);
420 }