ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
Revision: 2.2
Committed: Sat Jun 9 16:47:27 2012 UTC (11 years, 10 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.1: +64 -35 lines
Log Message:
Improved process queue handling

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: rc3.c,v 2.1 2012/06/09 07:16:47 greg Exp $";
3 #endif
4 /*
5 * Accumulate ray contributions for a set of materials
6 * Controlling process for multiple children
7 */
8
9 #include "rcontrib.h"
10 #include "platform.h"
11 #include "rtprocess.h"
12 #include "selcall.h"
13
14 /* Modifier contribution queue (results waiting to be output) */
15 typedef struct s_binq {
16 int ndx; /* index for this entry */
17 int n2add; /* number left to add */
18 struct s_binq *next; /* next in queue */
19 MODCONT *mca[1]; /* contrib. array (extends struct) */
20 } BINQ;
21
22 static BINQ *out_bq = NULL; /* output bin queue */
23 static BINQ *free_bq = NULL; /* free queue entries */
24
25 static SUBPROC kida[MAXPROCESS]; /* child processes */
26 static FILE *inq_fp[MAXPROCESS]; /* input streams */
27
28
29 /* Get new (empty) bin queue entry */
30 static BINQ *
31 new_binq()
32 {
33 BINQ *bp = free_bq;
34 int i;
35
36 if (bp != NULL) { /* something already available? */
37 free_bq = bp->next;
38 bp->next = NULL;
39 bp->n2add = accumulate-1;
40 return(bp);
41 }
42 /* else allocate fresh */
43 bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *));
44 if (bp == NULL)
45 goto memerr;
46 for (i = nmods; i--; ) {
47 MODCONT *mp = (MODCONT *)lu_find(&modconttab,modname[i])->data;
48 bp->mca[i] = (MODCONT *)malloc(sizeof(MODCONT) +
49 sizeof(DCOLOR)*(mp->nbins-1));
50 if (bp->mca[i] == NULL)
51 goto memerr;
52 memcpy(bp->mca[i], mp, sizeof(MODCONT)-sizeof(DCOLOR));
53 /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
54 }
55 bp->ndx = 0;
56 bp->n2add = accumulate-1;
57 bp->next = NULL;
58 return(bp);
59 memerr:
60 error(SYSTEM, "out of memory in new_binq()");
61 return(NULL);
62 }
63
64
65 /* Free a bin queue entry */
66 static void
67 free_binq(BINQ *bp)
68 {
69 int i;
70
71 if (bp == NULL) { /* signal to release our free list */
72 while ((bp = free_bq) != NULL) {
73 free_bq = bp->next;
74 for (i = nmods; i--; )
75 free(bp->mca[i]);
76 /* Note: we don't own bp->mca[i]->binv */
77 free(bp);
78 }
79 return;
80 }
81 /* clear sums for next use */
82 /* for (i = nmods; i--; )
83 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
84 */
85 bp->ndx = 0;
86 bp->next = free_bq; /* push onto free list */
87 free_bq = bp;
88 }
89
90
91 /* Add modifier values to accumulation record in queue and clear */
92 void
93 queue_modifiers()
94 {
95 MODCONT *mpin, *mpout;
96 int i, j;
97
98 if ((accumulate > 0) | (out_bq == NULL))
99 error(CONSISTENCY, "bad call to queue_modifiers()");
100
101 for (i = nmods; i--; ) {
102 mpin = (MODCONT *)lu_find(&modconttab,modname[i])->data;
103 mpout = out_bq->mca[i];
104 for (j = mpout->nbins; j--; )
105 addcolor(mpout->cbin[j], mpin->cbin[j]);
106 memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
107 }
108 }
109
110
111 /* Sum one modifier record into another (doesn't update n2add) */
112 static void
113 add_modbin(BINQ *dst, BINQ *src)
114 {
115 int i, j;
116
117 for (i = nmods; i--; ) {
118 MODCONT *mpin = src->mca[i];
119 MODCONT *mpout = dst->mca[i];
120 for (j = mpout->nbins; j--; )
121 addcolor(mpout->cbin[j], mpin->cbin[j]);
122 }
123 }
124
125
126 /* Queue values for later output */
127 static void
128 queue_output(BINQ *bp)
129 {
130 BINQ *b_last, *b_cur;
131
132 if (accumulate <= 0) { /* just accumulating? */
133 if (out_bq == NULL) {
134 bp->next = NULL;
135 out_bq = bp;
136 } else {
137 add_modbin(out_bq, bp);
138 free_binq(bp);
139 }
140 return;
141 }
142 b_last = NULL; /* else insert in output queue */
143 for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
144 b_cur = b_cur->next)
145 b_last = b_cur;
146 if (b_last != NULL) {
147 bp->next = b_cur;
148 b_last->next = bp;
149 } else {
150 bp->next = out_bq;
151 out_bq = bp;
152 }
153 if (accumulate <= 1) /* no accumulating? */
154 return;
155 b_cur = out_bq; /* else merge accumulation entries */
156 while (b_cur->next != NULL) {
157 if (b_cur->n2add <= 0 ||
158 (b_cur->ndx-1)/accumulate !=
159 (b_cur->next->ndx-1)/accumulate) {
160 b_cur = b_cur->next;
161 continue;
162 }
163 add_modbin(b_cur, b_cur->next);
164 b_cur->n2add--;
165 b_last = b_cur->next;
166 b_cur->next = b_last->next;
167 b_last->next = NULL;
168 free_binq(b_last);
169 }
170 }
171
172
173 /* Get current with output FIFO by producing ready results */
174 static int
175 output_catchup()
176 {
177 int nout = 0;
178 BINQ *bp;
179 int i;
180
181 if (accumulate <= 0) /* just accumulating? */
182 return(0);
183 /* else output ready results */
184 while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) {
185 bp = out_bq; /* pop off first entry */
186 out_bq = bp->next;
187 bp->next = NULL;
188 for (i = 0; i < nmods; i++) /* output record */
189 mod_output(bp->mca[i]);
190 end_record();
191 free_binq(bp); /* free this entry */
192 lastdone += accumulate;
193 ++nout;
194 }
195 return(nout);
196 }
197
198
199 /* Put a zero record in results queue */
200 void
201 zero_record(int ndx)
202 {
203 BINQ *bp = new_binq();
204 int i;
205
206 for (i = nmods; i--; )
207 memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
208 bp->ndx = ndx;
209 queue_output(bp);
210 }
211
212
213 /* callback to set output spec to NULL (stdout) */
214 static int
215 set_stdout(const LUENT *le, void *p)
216 {
217 (*(MODCONT *)le->data).outspec = NULL;
218 return(0);
219 }
220
221
222 /* Start child processes if we can */
223 int
224 in_rchild()
225 {
226 #ifdef _WIN32
227 error(WARNING, "multiprocessing unsupported -- running solo");
228 nproc = 1;
229 return(1);
230 #else
231 /* try to fork ourselves */
232 while (nchild < nproc) {
233 int p0[2], p1[2];
234 int pid;
235 /* prepare i/o pipes */
236 if (pipe(p0) < 0 || pipe(p1) < 0)
237 error(SYSTEM, "pipe() call failed!");
238 pid = fork(); /* fork parent process */
239 if (pid == 0) { /* if in child, set up & return true */
240 close(p0[1]); close(p1[0]);
241 dup2(p0[0], 0); close(p0[0]);
242 dup2(p1[1], 1); close(p1[1]);
243 inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
244 outfmt = 'd';
245 header = 0;
246 waitflush = xres = 1;
247 yres = 0;
248 raysleft = 0;
249 account = accumulate = 1;
250 lu_doall(&modconttab, set_stdout, NULL);
251 nchild = -1;
252 return(1); /* child return value */
253 }
254 if (pid < 0)
255 error(SYSTEM, "fork() call failed!");
256 /* connect our pipes */
257 close(p0[0]); close(p1[1]);
258 kida[nchild].r = p1[0];
259 kida[nchild].w = p0[1];
260 kida[nchild].pid = pid;
261 kida[nchild].running = -1;
262 inq_fp[nchild] = fdopen(p1[0], "rb");
263 if (inq_fp[nchild] == NULL)
264 error(SYSTEM, "out of memory in in_rchild()");
265 ++nchild;
266 }
267 return(0); /* parent return value */
268 #endif
269 }
270
271
272 /* Close child processes */
273 void
274 end_children()
275 {
276 int status;
277
278 while (nchild-- > 0) {
279 kida[nchild].r = -1; /* close(-1) error is ignored */
280 if ((status = close_process(&kida[nchild])) > 0) {
281 sprintf(errmsg,
282 "rendering process returned bad status (%d)",
283 status);
284 error(WARNING, errmsg);
285 }
286 fclose(inq_fp[nchild]); /* performs actual close() */
287 }
288 }
289
290
291 /* Wait for the next available child, managing output queue as well */
292 static int
293 next_child_nq(int force_wait)
294 {
295 static struct timeval polling;
296 struct timeval *pmode = force_wait | (accumulate <= 0) ?
297 (struct timeval *)NULL : &polling;
298 fd_set readset, errset;
299 int i, j, n, nr;
300
301 if (!force_wait) /* see if there's one free */
302 for (i = nchild; i--; )
303 if (kida[i].running < 0)
304 return(i);
305 /* prepare select() call */
306 FD_ZERO(&readset); FD_ZERO(&errset);
307 n = nr = 0;
308 for (i = nchild; i--; ) {
309 if (kida[i].running > 0) {
310 FD_SET(kida[i].r, &readset);
311 ++nr;
312 }
313 FD_SET(kida[i].r, &errset);
314 if (kida[i].r >= n)
315 n = kida[i].r + 1;
316 }
317 if (!nr) /* nothing going on */
318 return(-1);
319 tryagain:
320 if (pmode == NULL) /* about to block, so catch up */
321 output_catchup();
322 if ((nr > 1) | (pmode == &polling)) {
323 errno = 0;
324 nr = select(n, &readset, NULL, &errset, pmode);
325 if (!nr & (pmode == &polling)) {
326 pmode = NULL; /* try again, blocking this time */
327 goto tryagain;
328 }
329 if (nr <= 0)
330 error(SYSTEM, "select call error in next_child_nq()");
331 } else
332 FD_ZERO(&errset);
333 n = -1; /* read results from child(ren) */
334 for (i = nchild; i--; ) {
335 BINQ *bq;
336 if (FD_ISSET(kida[i].r, &errset))
337 error(USER, "rendering process died");
338 if (!FD_ISSET(kida[i].r, &readset))
339 continue;
340 bq = new_binq(); /* get results holder */
341 bq->ndx = kida[i].running;
342 /* read from child */
343 for (j = 0; j < nmods; j++) {
344 n = bq->mca[j]->nbins;
345 nr = fread(bq->mca[j]->cbin,sizeof(DCOLOR),n,inq_fp[i]);
346 if (nr != n)
347 error(SYSTEM, "read error from render process");
348 }
349 queue_output(bq); /* put results in output queue */
350 kida[i].running = -1; /* mark child as available */
351 n = i;
352 }
353 return(n); /* last available child */
354 }
355
356
357 /* Run parental oversight loop */
358 void
359 parental_loop()
360 {
361 static int ignore_warning_given = 0;
362 FVECT orgdir[2];
363 double d;
364 /* load rays from stdin & process */
365 #ifdef getc_unlocked
366 flockfile(stdin); /* avoid lock/unlock overhead */
367 #endif
368 while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) {
369 d = normalize(orgdir[1]);
370 /* asking for flush? */
371 if ((d == 0.0) & (accumulate != 1)) {
372 if (!ignore_warning_given++)
373 error(WARNING,
374 "dummy ray(s) ignored during accumulation\n");
375 continue;
376 }
377 if ((d == 0.0) | (lastray+1 < lastray)) {
378 while (next_child_nq(1) >= 0)
379 ;
380 lastdone = lastray = 0;
381 }
382 if (d == 0.0) {
383 if ((yres <= 0) | (xres <= 0))
384 waitflush = 1; /* flush right after */
385 zero_record(++lastray);
386 } else { /* else assign */
387 int avail = next_child_nq(0);
388 if (writebuf(kida[avail].w, (char *)orgdir,
389 sizeof(FVECT)*2) != sizeof(FVECT)*2)
390 error(SYSTEM, "pipe write error");
391 kida[avail].running = ++lastray;
392 }
393 if (raysleft && !--raysleft)
394 break; /* preemptive EOI */
395 }
396 while (next_child_nq(1) >= 0) /* empty results queue */
397 ;
398 /* output accumulated record */
399 if (accumulate <= 0 || account < accumulate) {
400 int i;
401 if (account < accumulate) {
402 error(WARNING, "partial accumulation in final record");
403 accumulate -= account;
404 }
405 for (i = 0; i < nmods; i++)
406 mod_output(out_bq->mca[i]);
407 end_record();
408 }
409 if (raysleft)
410 error(USER, "unexpected EOF on input");
411 free_binq(NULL); /* clean up */
412 lu_done(&ofiletab);
413 }