ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/raypcalls.c
Revision: 2.22
Committed: Sat Dec 12 05:20:10 2009 UTC (14 years, 4 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.21: +10 -11 lines
Log Message:
Created FIFO calls for ray multiprocessing

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: raypcalls.c,v 2.21 2009/12/12 00:03:42 greg Exp $";
3 #endif
4 /*
5 * raypcalls.c - interface for parallel rendering using Radiance
6 *
7 * External symbols declared in ray.h
8 */
9
10 #include "copyright.h"
11
12 /*
13 * These calls are designed similarly to the ones in raycalls.c,
14 * but allow for multiple rendering processes on the same host
15 * machine. There is no sense in specifying more child processes
16 * than you have processor cores, but one child may help by allowing
17 * asynchronous ray computation in an interactive program, and
18 * will protect the caller from fatal rendering errors.
19 *
20 * You should first read and understand the header in raycalls.c,
21 * as some things are explained there that are not repated here.
22 *
23 * The first step is opening one or more rendering processes
24 * with a call to ray_pinit(oct, nproc). Before calling fork(),
25 * ray_pinit() loads the octree and data structures into the
26 * caller's memory, and ray_popen() synchronizes the ambient
27 * file, if any. Shared memory permits all sorts of queries
28 * that wouldn't be possible otherwise without causing any real
29 * memory overhead, since all the static data are shared
30 * between processes. Rays are traced using a simple
31 * queuing mechanism, explained below.
32 *
33 * The ray queue buffers RAYQLEN rays before sending to
34 * children, each of which may internally buffer RAYQLEN rays
35 * during evaluation. Rays are not returned in the order
36 * they are sent when multiple processes are open.
37 *
38 * Rays are queued and returned by a single
39 * ray_pqueue() call. A ray_pqueue() return
40 * value of 0 indicates that no rays are ready
41 * and the queue is not yet full. A return value of 1
42 * indicates that a ray was returned, though it is probably
43 * not the one you just requested. Rays may be identified by
44 * the rno member of the RAY struct, which is incremented
45 * by the rayorigin() call, or may be set explicitly by
46 * the caller. Below is an example call sequence:
47 *
48 * myRay.rorg = ( ray origin point )
49 * myRay.rdir = ( normalized ray direction )
50 * myRay.rmax = ( maximum length, or zero for no limit )
51 * rayorigin(&myRay, PRIMARY, NULL, NULL);
52 * myRay.rno = ( my personal ray identifier )
53 * if (ray_pqueue(&myRay) == 1)
54 * { do something with results }
55 *
56 * Note the differences between this and the simpler ray_trace()
57 * call. In particular, the call may or may not return a value
58 * in the passed ray structure. Also, you need to call rayorigin()
59 * yourself, which is normally called for you by ray_trace(). The
60 * benefit is that ray_pqueue() will trace rays faster in
61 * proportion to the number of CPUs you have available on your
62 * system. If the ray queue is full before the call, ray_pqueue()
63 * will block until a result is ready so it can queue this one.
64 * The global int ray_pnidle indicates the number of currently idle
65 * children. If you want to check for completed rays without blocking,
66 * or get the results from rays that have been queued without
67 * queuing any new ones, the ray_presult() call is for you:
68 *
69 * if (ray_presult(&myRay, 1) == 1)
70 * { do something with results }
71 *
72 * If the second argument is 1, the call won't block when
73 * results aren't ready, but will immediately return 0.
74 * If the second argument is 0, the call will block
75 * until a value is available, returning 0 only if the
76 * queue is completely empty. A negative return value
77 * indicates that a rendering process died. If this
78 * happens, ray_pclose(0) is automatically called to close
79 * all child processes, and ray_pnprocs is set to zero.
80 *
81 * If you just want to fill the ray queue without checking for
82 * results, check ray_pnidle and call ray_psend():
83 *
84 * while (ray_pnidle) {
85 * ( set up ray )
86 * ray_psend(&myRay);
87 * }
88 *
89 * Note that it is a fatal error to call ra_psend() when
90 * ray_pnidle is zero. The ray_presult() and/or ray_pqueue()
91 * functions may be called subsequently to read back the results.
92 *
93 * When you are done, you may call ray_pdone(1) to close
94 * all child processes and clean up memory used by Radiance.
95 * Any queued ray calculations will be awaited and discarded.
96 * As with ray_done(), ray_pdone(0) hangs onto data files
97 * and fonts that are likely to be used in subsequent renderings.
98 * Whether you need to clean up memory or not, you should
99 * at least call ray_pclose(0) to await the child processes.
100 *
101 * Warning: You cannot affect any of the rendering processes
102 * by changing global parameter values onece ray_pinit() has
103 * been called. Changing global parameters will have no effect
104 * until the next call to ray_pinit(), which restarts everything.
105 * If you just want to reap children so that you can alter the
106 * rendering parameters without reloading the scene, use the
107 * ray_pclose(0) and ray_popen(nproc) calls to close
108 * then restart the child processes after the changes are made.
109 *
110 * Note: These routines are written to coordinate with the
111 * definitions in raycalls.c, and in fact depend on them.
112 * If you want to trace a ray and get a result synchronously,
113 * use the ray_trace() call to compute it in the parent process.
114 * This will not interfere with any subprocess calculations,
115 * but beware that a fatal error may end with a call to quit().
116 *
117 * Note: One of the advantages of using separate processes
118 * is that it gives the calling program some immunity from
119 * fatal rendering errors. As discussed in raycalls.c,
120 * Radiance tends to throw up its hands and exit at the
121 * first sign of trouble, calling quit() to return control
122 * to the top level. Although you can avoid exit() with
123 * your own longjmp() in quit(), the cleanup afterwards
124 * is always suspect. Through the use of subprocesses,
125 * we avoid this pitfall by closing the processes and
126 * returning a negative value from ray_pqueue() or
127 * ray_presult(). If you get a negative value from either
128 * of these calls, you can assume that the processes have
129 * been cleaned up with a call to ray_pclose(), though you
130 * will have to call ray_pdone() yourself if you want to
131 * free memory. Obviously, you cannot continue rendering
132 * without risking further errors, but otherwise your
133 * process should not be compromised.
134 */
135
136 #include "rtprocess.h"
137 #include "ray.h"
138 #include "ambient.h"
139 #include <sys/types.h>
140 #include <sys/wait.h>
141 #include "selcall.h"
142
143 #ifndef RAYQLEN
144 #define RAYQLEN 12 /* # rays to send at once */
145 #endif
146
147 #ifndef MAX_RPROCS
148 #if (FD_SETSIZE/2-4 < 64)
149 #define MAX_NPROCS (FD_SETSIZE/2-4)
150 #else
151 #define MAX_NPROCS 64 /* max. # rendering processes */
152 #endif
153 #endif
154
155 extern char *shm_boundary; /* boundary of shared memory */
156
157 int ray_pnprocs = 0; /* number of child processes */
158 int ray_pnidle = 0; /* number of idle children */
159
160 static struct child_proc {
161 int pid; /* child process id */
162 int fd_send; /* write to child here */
163 int fd_recv; /* read from child here */
164 int npending; /* # rays in process */
165 RNUMBER rno[RAYQLEN]; /* working on these rays */
166 } r_proc[MAX_NPROCS]; /* our child processes */
167
168 static RAY r_queue[2*RAYQLEN]; /* ray i/o buffer */
169 static int r_send_next; /* next send ray placement */
170 static int r_recv_first; /* position of first unreported ray */
171 static int r_recv_next; /* next received ray placement */
172
173 #define sendq_full() (r_send_next >= RAYQLEN)
174
175 static int ray_pflush(void);
176 static void ray_pchild(int fd_in, int fd_out);
177
178
179 void
180 ray_pinit( /* initialize ray-tracing processes */
181 char *otnm,
182 int nproc
183 )
184 {
185 if (nobjects > 0) /* close old calculation */
186 ray_pdone(0);
187
188 ray_init(otnm); /* load the shared scene */
189
190 r_send_next = 0; /* set up queue */
191 r_recv_first = r_recv_next = RAYQLEN;
192
193 ray_popen(nproc); /* fork children */
194 }
195
196
197 static int
198 ray_pflush(void) /* send queued rays to idle children */
199 {
200 int nc, n, nw, i, sfirst;
201
202 if ((ray_pnidle <= 0) | (r_send_next <= 0))
203 return(0); /* nothing we can send */
204
205 sfirst = 0; /* divvy up labor */
206 nc = ray_pnidle;
207 for (i = ray_pnprocs; nc && i--; ) {
208 if (r_proc[i].npending > 0)
209 continue; /* child looks busy */
210 n = (r_send_next - sfirst)/nc--;
211 if (!n)
212 continue;
213 /* smuggle set size in crtype */
214 r_queue[sfirst].crtype = n;
215 nw = writebuf(r_proc[i].fd_send, (char *)&r_queue[sfirst],
216 sizeof(RAY)*n);
217 if (nw != sizeof(RAY)*n)
218 return(-1); /* write error */
219 r_proc[i].npending = n;
220 while (n--) /* record ray IDs */
221 r_proc[i].rno[n] = r_queue[sfirst+n].rno;
222 sfirst += r_proc[i].npending;
223 ray_pnidle--; /* now she's busy */
224 }
225 if (sfirst != r_send_next)
226 error(CONSISTENCY, "code screwup in ray_pflush");
227 r_send_next = 0;
228 return(sfirst); /* return total # sent */
229 }
230
231
232 void
233 ray_psend( /* add a ray to our send queue */
234 RAY *r
235 )
236 {
237 if (r == NULL)
238 return;
239 /* flush output if necessary */
240 if (sendq_full() && ray_pflush() <= 0)
241 error(INTERNAL, "ray_pflush failed in ray_psend");
242
243 r_queue[r_send_next++] = *r;
244 }
245
246
247 int
248 ray_pqueue( /* queue a ray for computation */
249 RAY *r
250 )
251 {
252 if (r == NULL)
253 return(0);
254 /* check for full send queue */
255 if (sendq_full()) {
256 RAY mySend = *r;
257 /* wait for a result */
258 if (ray_presult(r, 0) <= 0)
259 return(-1);
260 /* put new ray in queue */
261 r_queue[r_send_next++] = mySend;
262 /* XXX r_send_next may now be > RAYQLEN */
263 return(1);
264 }
265 /* else add ray to send queue */
266 r_queue[r_send_next++] = *r;
267 /* check for returned ray... */
268 if (r_recv_first >= r_recv_next)
269 return(0);
270 /* ...one is sitting in queue */
271 *r = r_queue[r_recv_first++];
272 return(1);
273 }
274
275
276 int
277 ray_presult( /* check for a completed ray */
278 RAY *r,
279 int poll
280 )
281 {
282 static struct timeval tpoll; /* zero timeval struct */
283 static fd_set readset, errset;
284 int n, ok;
285 register int pn;
286
287 if (r == NULL)
288 return(0);
289 /* check queued results first */
290 if (r_recv_first < r_recv_next) {
291 *r = r_queue[r_recv_first++];
292 return(1);
293 }
294 n = ray_pnprocs - ray_pnidle; /* pending before flush? */
295
296 if (ray_pflush() < 0) /* send new rays to process */
297 return(-1);
298 /* reset receive queue */
299 r_recv_first = r_recv_next = RAYQLEN;
300
301 if (!poll) /* count newly sent unless polling */
302 n = ray_pnprocs - ray_pnidle;
303 if (n <= 0) /* return if nothing to await */
304 return(0);
305 if (!poll && ray_pnprocs == 1) /* one process -> skip select() */
306 FD_SET(r_proc[0].fd_recv, &readset);
307
308 getready: /* any children waiting for us? */
309 for (pn = ray_pnprocs; pn--; )
310 if (FD_ISSET(r_proc[pn].fd_recv, &readset) ||
311 FD_ISSET(r_proc[pn].fd_recv, &errset))
312 break;
313 /* call select() if we must */
314 if (pn < 0) {
315 FD_ZERO(&readset); FD_ZERO(&errset); n = 0;
316 for (pn = ray_pnprocs; pn--; ) {
317 if (r_proc[pn].npending > 0)
318 FD_SET(r_proc[pn].fd_recv, &readset);
319 FD_SET(r_proc[pn].fd_recv, &errset);
320 if (r_proc[pn].fd_recv >= n)
321 n = r_proc[pn].fd_recv + 1;
322 }
323 /* find out who is ready */
324 while ((n = select(n, &readset, (fd_set *)NULL, &errset,
325 poll ? &tpoll : (struct timeval *)NULL)) < 0)
326 if (errno != EINTR) {
327 error(WARNING,
328 "select call failed in ray_presult");
329 ray_pclose(0);
330 return(-1);
331 }
332 if (n > 0) /* go back and get it */
333 goto getready;
334 return(0); /* else poll came up empty */
335 }
336 if (r_recv_next + r_proc[pn].npending > sizeof(r_queue)/sizeof(RAY))
337 error(CONSISTENCY, "buffer shortage in ray_presult()");
338
339 /* read rendered ray data */
340 n = readbuf(r_proc[pn].fd_recv, (char *)&r_queue[r_recv_next],
341 sizeof(RAY)*r_proc[pn].npending);
342 if (n > 0) {
343 r_recv_next += n/sizeof(RAY);
344 ok = (n == sizeof(RAY)*r_proc[pn].npending);
345 } else
346 ok = 0;
347 /* reset child's status */
348 FD_CLR(r_proc[pn].fd_recv, &readset);
349 if (n <= 0)
350 FD_CLR(r_proc[pn].fd_recv, &errset);
351 r_proc[pn].npending = 0;
352 ray_pnidle++;
353 /* check for rendering errors */
354 if (!ok) {
355 ray_pclose(0); /* process died -- clean up */
356 return(-1);
357 }
358 /* preen returned rays */
359 for (n = r_recv_next - r_recv_first; n--; ) {
360 register RAY *rp = &r_queue[r_recv_first + n];
361 rp->rno = r_proc[pn].rno[n];
362 rp->parent = NULL;
363 rp->newcset = rp->clipset = NULL;
364 rp->rox = NULL;
365 rp->slights = NULL;
366 }
367 /* return first ray received */
368 *r = r_queue[r_recv_first++];
369 return(1);
370 }
371
372
373 void
374 ray_pdone( /* reap children and free data */
375 int freall
376 )
377 {
378 ray_pclose(0); /* close child processes */
379
380 if (shm_boundary != NULL) { /* clear shared memory boundary */
381 free((void *)shm_boundary);
382 shm_boundary = NULL;
383 }
384 ray_done(freall); /* free rendering data */
385 }
386
387
388 static void
389 ray_pchild( /* process rays (never returns) */
390 int fd_in,
391 int fd_out
392 )
393 {
394 int n;
395 register int i;
396 /* flag child process for quit() */
397 ray_pnprocs = -1;
398 /* read each ray request set */
399 while ((n = read(fd_in, (char *)r_queue, sizeof(r_queue))) > 0) {
400 int n2;
401 if (n < sizeof(RAY))
402 break;
403 /* get smuggled set length */
404 n2 = sizeof(RAY)*r_queue[0].crtype - n;
405 if (n2 < 0)
406 error(INTERNAL, "buffer over-read in ray_pchild");
407 if (n2 > 0) { /* read the rest of the set */
408 i = readbuf(fd_in, (char *)r_queue + n, n2);
409 if (i != n2)
410 break;
411 n += n2;
412 }
413 n /= sizeof(RAY);
414 /* evaluate rays */
415 for (i = 0; i < n; i++) {
416 r_queue[i].crtype = r_queue[i].rtype;
417 r_queue[i].parent = NULL;
418 r_queue[i].clipset = NULL;
419 r_queue[i].slights = NULL;
420 r_queue[i].rlvl = 0;
421 samplendx++;
422 rayclear(&r_queue[i]);
423 rayvalue(&r_queue[i]);
424 }
425 /* write back our results */
426 i = writebuf(fd_out, (char *)r_queue, sizeof(RAY)*n);
427 if (i != sizeof(RAY)*n)
428 error(SYSTEM, "write error in ray_pchild");
429 }
430 if (n)
431 error(SYSTEM, "read error in ray_pchild");
432 ambsync();
433 quit(0); /* normal exit */
434 }
435
436
437 void
438 ray_popen( /* open the specified # processes */
439 int nadd
440 )
441 {
442 /* check if our table has room */
443 if (ray_pnprocs + nadd > MAX_NPROCS)
444 nadd = MAX_NPROCS - ray_pnprocs;
445 if (nadd <= 0)
446 return;
447 ambsync(); /* load any new ambient values */
448 if (shm_boundary == NULL) { /* first child process? */
449 preload_objs(); /* preload auxiliary data */
450 /* set shared memory boundary */
451 shm_boundary = (char *)malloc(16);
452 strcpy(shm_boundary, "SHM_BOUNDARY");
453 }
454 fflush(NULL); /* clear pending output */
455 while (nadd--) { /* fork each new process */
456 int p0[2], p1[2];
457 if (pipe(p0) < 0 || pipe(p1) < 0)
458 error(SYSTEM, "cannot create pipe");
459 if ((r_proc[ray_pnprocs].pid = fork()) == 0) {
460 int pn; /* close others' descriptors */
461 for (pn = ray_pnprocs; pn--; ) {
462 close(r_proc[pn].fd_send);
463 close(r_proc[pn].fd_recv);
464 }
465 close(p0[0]); close(p1[1]);
466 /* following call never returns */
467 ray_pchild(p1[0], p0[1]);
468 }
469 if (r_proc[ray_pnprocs].pid < 0)
470 error(SYSTEM, "cannot fork child process");
471 close(p1[0]); close(p0[1]);
472 /*
473 * Close write stream on exec to avoid multiprocessing deadlock.
474 * No use in read stream without it, so set flag there as well.
475 */
476 fcntl(p1[1], F_SETFD, FD_CLOEXEC);
477 fcntl(p0[0], F_SETFD, FD_CLOEXEC);
478 r_proc[ray_pnprocs].fd_send = p1[1];
479 r_proc[ray_pnprocs].fd_recv = p0[0];
480 r_proc[ray_pnprocs].npending = 0;
481 ray_pnprocs++;
482 ray_pnidle++;
483 }
484 }
485
486
487 void
488 ray_pclose( /* close one or more child processes */
489 int nsub
490 )
491 {
492 static int inclose = 0;
493 RAY res;
494 /* check recursion */
495 if (inclose)
496 return;
497 inclose++;
498 /* check argument */
499 if ((nsub <= 0) | (nsub > ray_pnprocs))
500 nsub = ray_pnprocs;
501 /* clear our ray queue */
502 while (ray_presult(&res,0) > 0)
503 ;
504 /* clean up children */
505 while (nsub--) {
506 int status;
507 ray_pnprocs--;
508 close(r_proc[ray_pnprocs].fd_recv);
509 close(r_proc[ray_pnprocs].fd_send);
510 if (waitpid(r_proc[ray_pnprocs].pid, &status, 0) < 0)
511 status = 127<<8;
512 if (status) {
513 sprintf(errmsg,
514 "rendering process %d exited with code %d",
515 r_proc[ray_pnprocs].pid, status>>8);
516 error(WARNING, errmsg);
517 }
518 ray_pnidle--;
519 }
520 inclose--;
521 }
522
523
524 void
525 quit(ec) /* make sure exit is called */
526 int ec;
527 {
528 if (ray_pnprocs > 0) /* close children if any */
529 ray_pclose(0);
530 exit(ec);
531 }