ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
(Generate patch)

Comparing ray/src/rt/rc3.c (file contents):
Revision 2.1 by greg, Sat Jun 9 07:16:47 2012 UTC vs.
Revision 2.10 by greg, Sat Jun 16 17:09:49 2012 UTC

# Line 11 | Line 11 | static const char RCSid[] = "$Id$";
11   #include "rtprocess.h"
12   #include "selcall.h"
13  
14 + #define MAXIQ           (int)(PIPE_BUF/(sizeof(FVECT)*2))
15 +
16   /* Modifier contribution queue (results waiting to be output) */
17   typedef struct s_binq {
18 <        int             ndx;            /* index for this entry */
19 <        int             n2add;          /* number left to add */
18 >        RNUMBER         ndx;            /* index for this entry */
19 >        RNUMBER         nadded;         /* accumulated so far */
20          struct s_binq   *next;          /* next in queue */
21          MODCONT         *mca[1];        /* contrib. array (extends struct) */
22   } BINQ;
# Line 22 | Line 24 | typedef struct s_binq {
24   static BINQ     *out_bq = NULL;         /* output bin queue */
25   static BINQ     *free_bq = NULL;        /* free queue entries */
26  
27 < static SUBPROC  kida[MAXPROCESS];       /* child processes */
27 > static struct {
28 >        RNUMBER r1;                     /* assigned ray starting index */
29 >        SUBPROC pr;                     /* PID, i/o descriptors */
30 >        FILE    *infp;                  /* file pointer to read from process */
31 >        int     nr;                     /* number of rays to sum (0 if free) */
32 > } kida[MAXPROCESS];             /* our child processes */
33  
34  
35 < /* Get new (empty) bin queue entry */
35 > /* Get new bin queue entry */
36   static BINQ *
37   new_binq()
38   {
39 <        BINQ    *bp = free_bq;
39 >        BINQ    *bp;
40          int     i;
41  
42 <        if (bp != NULL) {               /* something already available? */
42 >        if (free_bq != NULL) {          /* something already available? */
43 >                bp = free_bq;
44                  free_bq = bp->next;
45                  bp->next = NULL;
46 <                bp->n2add = accumulate-1;
46 >                bp->nadded = 0;
47                  return(bp);
48          }
49                                          /* else allocate fresh */
50 <        bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *));
50 >        bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
51          if (bp == NULL)
52                  goto memerr;
53          for (i = nmods; i--; ) {
# Line 52 | Line 60 | new_binq()
60                  /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
61          }
62          bp->ndx = 0;
63 <        bp->n2add = accumulate-1;
63 >        bp->nadded = 0;
64          bp->next = NULL;
65          return(bp);
66   memerr:
# Line 88 | Line 96 | free_binq(BINQ *bp)
96  
97  
98   /* Add modifier values to accumulation record in queue and clear */
99 < void
99 > static void
100   queue_modifiers()
101   {
102          MODCONT *mpin, *mpout;
# Line 104 | Line 112 | queue_modifiers()
112                          addcolor(mpout->cbin[j], mpin->cbin[j]);
113                  memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
114          }
115 +        out_bq->nadded++;
116   }
117  
118  
119 < /* Sum one modifier record into another (doesn't update n2add) */
119 > /* Sum one modifier record into another (updates nadded) */
120   static void
121   add_modbin(BINQ *dst, BINQ *src)
122   {
# Line 119 | Line 128 | add_modbin(BINQ *dst, BINQ *src)
128                  for (j = mpout->nbins; j--; )
129                          addcolor(mpout->cbin[j], mpin->cbin[j]);
130          }
131 +        dst->nadded += src->nadded;
132   }
133  
134  
135 < /* Queue output, catching up with and freeing FIFO entries when possible */
136 < static int
135 > /* Queue values for later output */
136 > static void
137   queue_output(BINQ *bp)
138   {
129        int     nout = 0;
139          BINQ    *b_last, *b_cur;
131        int     i;
140  
141          if (accumulate <= 0) {          /* just accumulating? */
142                  if (out_bq == NULL) {
# Line 138 | Line 146 | queue_output(BINQ *bp)
146                          add_modbin(out_bq, bp);
147                          free_binq(bp);
148                  }
149 <                return(0);
149 >                return;
150          }
151 <        b_last = NULL;                  /* else insert in output queue */
151 >        b_last = NULL;                  /* insert in output queue */
152          for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
153                                  b_cur = b_cur->next)
154                  b_last = b_cur;
155 +
156          if (b_last != NULL) {
157                  bp->next = b_cur;
158                  b_last->next = bp;
# Line 151 | Line 160 | queue_output(BINQ *bp)
160                  bp->next = out_bq;
161                  out_bq = bp;
162          }
163 <        if (accumulate > 1) {           /* merge accumulation entries */
164 <                b_cur = out_bq;
165 <                while (b_cur->next != NULL) {
166 <                        if (b_cur->n2add <= 0 ||
167 <                                        (b_cur->ndx-1)/accumulate !=
168 <                                        (b_cur->next->ndx-1)/accumulate) {
169 <                                b_cur = b_cur->next;
170 <                                continue;
171 <                        }
163 <                        add_modbin(b_cur, b_cur->next);
164 <                        b_cur->n2add--;
165 <                        b_last = b_cur->next;
166 <                        b_cur->next = b_last->next;
167 <                        b_last->next = NULL;
168 <                        free_binq(b_last);
163 >        if (accumulate == 1)            /* no accumulation? */
164 >                return;
165 >        b_cur = out_bq;                 /* else merge accumulation entries */
166 >        while (b_cur->next != NULL) {
167 >                if (b_cur->nadded >= accumulate ||
168 >                                (b_cur->ndx-1)/accumulate !=
169 >                                (b_cur->next->ndx-1)/accumulate) {
170 >                        b_cur = b_cur->next;
171 >                        continue;
172                  }
173 +                add_modbin(b_cur, b_cur->next);
174 +                b_last = b_cur->next;
175 +                b_cur->next = b_last->next;
176 +                b_last->next = NULL;
177 +                free_binq(b_last);
178          }
179 + }
180 +
181 +
182 + /* Count number of records ready for output */
183 + static int
184 + queue_ready()
185 + {
186 +        int     nready = 0;
187 +        BINQ    *bp;
188 +
189 +        for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
190 +                                bp->ndx == lastdone+nready*accumulate+1;
191 +                                bp = bp->next)
192 +                ++nready;
193 +
194 +        return(nready);
195 + }
196 +
197 +
198 + /* Catch up with output queue by producing ready results */
199 + static int
200 + output_catchup(int nmax)
201 + {
202 +        int     nout = 0;
203 +        BINQ    *bp;
204 +        int     i;
205                                          /* output ready results */
206 <        while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) {
207 <                b_cur = out_bq;                 /* pop off first entry */
208 <                out_bq = b_cur->next;
209 <                b_cur->next = NULL;
206 >        while (out_bq != NULL && out_bq->nadded >= accumulate
207 >                                && out_bq->ndx == lastdone+1) {
208 >                if ((nmax > 0) & (nout >= nmax))
209 >                        break;
210 >                bp = out_bq;                    /* pop off first entry */
211 >                out_bq = bp->next;
212 >                bp->next = NULL;
213                  for (i = 0; i < nmods; i++)     /* output record */
214 <                        mod_output(b_cur->mca[i]);
214 >                        mod_output(bp->mca[i]);
215                  end_record();
216 <                free_binq(b_cur);               /* free this entry */
216 >                free_binq(bp);                  /* free this entry */
217                  lastdone += accumulate;
218                  ++nout;
219          }
# Line 184 | Line 221 | queue_output(BINQ *bp)
221   }
222  
223  
224 < /* Put a zero record in results queue */
224 > /* Put a zero record in results queue & output */
225   void
226 < zero_record(int ndx)
226 > put_zero_record(int ndx)
227   {
228          BINQ    *bp = new_binq();
229          int     i;
# Line 194 | Line 231 | zero_record(int ndx)
231          for (i = nmods; i--; )
232                  memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
233          bp->ndx = ndx;
234 +        bp->nadded = 1;
235          queue_output(bp);
236 +        output_catchup(0);
237   }
238  
239  
240 + /* Get results from child process and add to queue */
241 + static void
242 + queue_results(int k)
243 + {
244 +        BINQ    *bq = new_binq();       /* get results holder */
245 +        int     j;
246 +
247 +        bq->ndx = kida[k].r1;
248 +        bq->nadded = kida[k].nr;
249 +                                        /* read from child */
250 +        for (j = 0; j < nmods; j++)
251 +                if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
252 +                                        kida[k].infp) != bq->mca[j]->nbins)
253 +                        error(SYSTEM, "read error from render process");
254 +                        
255 +        queue_output(bq);               /* put results in output queue */
256 +        kida[k].nr = 0;                 /* mark child as available */
257 + }
258 +
259 +
260   /* callback to set output spec to NULL (stdout) */
261   static int
262   set_stdout(const LUENT *le, void *p)
# Line 211 | Line 270 | set_stdout(const LUENT *le, void *p)
270   int
271   in_rchild()
272   {
273 < #ifdef _WIN32
274 <        error(WARNING, "multiprocessing unsupported -- running solo");
275 <        nproc = 1;
276 <        return(1);
277 < #else
278 <                                        /* try to fork ourselves */
279 <        while (nchild < nproc) {
280 <                int     p0[2], p1[2];
281 <                int     pid;
282 <                                        /* prepare i/o pipes */
283 <                if (pipe(p0) < 0 || pipe(p1) < 0)
284 <                        error(SYSTEM, "pipe() call failed!");
285 <                pid = fork();           /* fork parent process */
286 <                if (pid == 0) {         /* if in child, set up & return true */
228 <                        close(p0[1]); close(p1[0]);
229 <                        dup2(p0[0], 0); close(p0[0]);
230 <                        dup2(p1[1], 1); close(p1[1]);
273 >        int     rval;
274 >
275 >        while (nchild < nproc) {        /* fork until target reached */
276 >                errno = 0;
277 >                rval = open_process(&kida[nchild].pr, NULL);
278 >                if (rval < 0)
279 >                        error(SYSTEM, "open_process() call failed");
280 >                if (rval == 0) {        /* if in child, set up & return true */
281 >                        lu_doall(&modconttab, set_stdout, NULL);
282 >                        lu_done(&ofiletab);
283 >                        while (nchild--) {      /* don't share other pipes */
284 >                                close(kida[nchild].pr.w);
285 >                                fclose(kida[nchild].infp);
286 >                        }
287                          inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
288                          outfmt = 'd';
289                          header = 0;
234                        waitflush = xres = 1;
290                          yres = 0;
291                          raysleft = 0;
292 <                        account = accumulate = 1;
293 <                        lu_doall(&modconttab, set_stdout, NULL);
294 <                        nchild = -1;
295 <                        return(1);      /* child return value */
292 >                        if (accumulate == 1) {
293 >                                waitflush = xres = 1;
294 >                                account = accumulate = 1;
295 >                        } else {        /* parent controls accumulation */
296 >                                waitflush = xres = 0;
297 >                                account = accumulate = 0;
298 >                        }
299 >                        return(1);      /* return "true" in child */
300                  }
301 <                if (pid < 0)
302 <                        error(SYSTEM, "fork() call failed!");
303 <                                        /* connect our pipes */
304 <                close(p0[0]); close(p1[1]);
305 <                kida[nchild].r = p1[0];
306 <                kida[nchild].w = p0[1];
307 <                kida[nchild].pid = pid;
308 <                kida[nchild].running = -1;
250 <                ++nchild;
251 <        }
252 <        return(0);                      /* parent return value */
301 >                if (rval != PIPE_BUF)
302 >                        error(CONSISTENCY, "bad value from open_process()");
303 >                                        /* connect to child's output */
304 >                kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
305 >                if (kida[nchild].infp == NULL)
306 >                        error(SYSTEM, "out of memory in in_rchild()");
307 > #ifdef getc_unlocked
308 >                flockfile(kida[nchild].infp);   /* avoid mutex overhead */
309   #endif
310 +                kida[nchild++].nr = 0;  /* mark as available */
311 +        }
312 +        return(0);                      /* return "false" in parent */
313   }
314  
315  
# Line 260 | Line 319 | end_children()
319   {
320          int     status;
321          
322 <        while (nchild-- > 0)
323 <                if ((status = close_process(&kida[nchild])) > 0) {
322 >        while (nchild > 0) {
323 >                nchild--;
324 >                if ((status = close_process(&kida[nchild].pr)) > 0) {
325                          sprintf(errmsg,
326                                  "rendering process returned bad status (%d)",
327                                          status);
328                          error(WARNING, errmsg);
329                  }
330 +                fclose(kida[nchild].infp);
331 +        }
332   }
333  
334  
335 < /* Wait for the next available child */
335 > /* Wait for the next available child, managing output queue simultaneously */
336   static int
337 < next_child_nq(int force_wait)
337 > next_child_nq(int flushing)
338   {
339 <        fd_set  readset, errset;
340 <        int     i, j, n, nr;
339 >        static struct timeval   polling;
340 >        struct timeval          *pmode;
341 >        fd_set                  readset, errset;
342 >        int                     i, n, nr, nqr;
343  
344 <        if (!force_wait)                /* see if there's one free */
344 >        if (!flushing)                  /* see if there's one free */
345                  for (i = nchild; i--; )
346 <                        if (kida[i].running < 0)
346 >                        if (!kida[i].nr)
347                                  return(i);
348 +
349 +        nqr = queue_ready();            /* choose blocking mode or polling */
350 +        if ((nqr > 0) & !flushing)
351 +                pmode = &polling;
352 +        else
353 +                pmode = NULL;
354 + tryagain:                               /* catch up with output? */
355 +        if (pmode == &polling) {
356 +                if (nqr > nchild)       /* don't get too far behind */
357 +                        nqr -= output_catchup(nqr-nchild);
358 +        } else if (nqr > 0)             /* clear output before blocking */
359 +                nqr -= output_catchup(0);
360                                          /* prepare select() call */
361          FD_ZERO(&readset); FD_ZERO(&errset);
362          n = nr = 0;
363          for (i = nchild; i--; ) {
364 <                if (kida[i].running > 0) {
365 <                        FD_SET(kida[i].r, &readset);
364 >                if (kida[i].nr) {
365 >                        FD_SET(kida[i].pr.r, &readset);
366                          ++nr;
367                  }
368 <                FD_SET(kida[i].r, &errset);
369 <                if (kida[i].r >= n)
370 <                        n = kida[i].r + 1;
368 >                FD_SET(kida[i].pr.r, &errset);
369 >                if (kida[i].pr.r >= n)
370 >                        n = kida[i].pr.r + 1;
371          }
372 <        if (!nr)                        /* nothing going on */
372 >        if (!nr)                        /* nothing to wait for? */
373                  return(-1);
374 <        if (nr > 1) {                   /* call select if multiple busy */
374 >        if ((nr > 1) | (pmode == &polling)) {
375                  errno = 0;
376 <                if (select(n, &readset, NULL, &errset, NULL) < 0)
377 <                        error(SYSTEM, "select call error in next_child_nq()");
376 >                nr = select(n, &readset, NULL, &errset, pmode);
377 >                if (!nr) {
378 >                        pmode = NULL;   /* try again, blocking this time */
379 >                        goto tryagain;
380 >                }
381 >                if (nr < 0)
382 >                        error(SYSTEM, "select() error in next_child_nq()");
383          } else
384                  FD_ZERO(&errset);
385          n = -1;                         /* read results from child(ren) */
386          for (i = nchild; i--; ) {
387 <                BINQ    *bq;
307 <                if (FD_ISSET(kida[i].r, &errset))
387 >                if (FD_ISSET(kida[i].pr.r, &errset))
388                          error(USER, "rendering process died");
389 <                if (!FD_ISSET(kida[i].r, &readset))
390 <                        continue;
311 <                bq = new_binq();        /* get results holder */
312 <                bq->ndx = kida[i].running;
313 <                                        /* read from child */
314 <                for (j = 0; j < nmods; j++) {
315 <                        n = sizeof(DCOLOR)*bq->mca[j]->nbins;
316 <                        nr = readbuf(kida[i].r, (char *)bq->mca[j]->cbin, n);
317 <                        if (nr != n)
318 <                                error(SYSTEM, "read error from render process");
319 <                }
320 <                queue_output(bq);       /* put results in output queue */
321 <                kida[i].running = -1;   /* mark child as available */
322 <                n = i;
389 >                if (FD_ISSET(kida[i].pr.r, &readset))
390 >                        queue_results(n = i);
391          }
392 <        return(n);                      /* last available child */
392 >        return(n);                      /* first available child */
393   }
394  
395  
# Line 330 | Line 398 | void
398   parental_loop()
399   {
400          static int      ignore_warning_given = 0;
401 <        FVECT           orgdir[2];
402 <        double          d;
401 >        int             qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
402 >        int             ninq = 0;
403 >        FVECT           orgdir[2*MAXIQ];
404 >        int             i, n;
405                                          /* load rays from stdin & process */
406   #ifdef getc_unlocked
407          flockfile(stdin);               /* avoid lock/unlock overhead */
408   #endif
409 <        while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) {
410 <                d = normalize(orgdir[1]);
411 <                                                        /* asking for flush? */
412 <                if ((d == 0.0) & (accumulate != 1)) {
413 <                        if (!ignore_warning_given++)
414 <                                error(WARNING,
409 >        while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
410 >                if (orgdir[2*ninq+1][0] == 0.0 &&       /* asking for flush? */
411 >                                (orgdir[2*ninq+1][1] == 0.0) &
412 >                                (orgdir[2*ninq+1][2] == 0.0)) {
413 >                        if (accumulate != 1) {
414 >                                if (!ignore_warning_given++)
415 >                                        error(WARNING,
416                                  "dummy ray(s) ignored during accumulation\n");
417 <                        continue;
418 <                }
348 <                if ((d == 0.0) | (lastray+1 < lastray)) {
417 >                                continue;
418 >                        }
419                          while (next_child_nq(1) >= 0)
420 <                                ;
420 >                                ;                       /* clear the queue */
421                          lastdone = lastray = 0;
352                }
353                if (d == 0.0) {
422                          if ((yres <= 0) | (xres <= 0))
423 <                                waitflush = 1;          /* flush right after */
424 <                        zero_record(++lastray);
425 <                } else {                                /* else assign */
426 <                        int     avail = next_child_nq(0);
427 <                        if (writebuf(kida[avail].w, (char *)orgdir,
428 <                                        sizeof(FVECT)*2) != sizeof(FVECT)*2)
423 >                                waitflush = 1;          /* flush next */
424 >                        put_zero_record(++lastray);
425 >                } else if (++ninq >= qlimit ||
426 >                            lastray/accumulate != (lastray+ninq)/accumulate) {
427 >                        i = next_child_nq(0);           /* manages output */
428 >                        n = ninq;
429 >                        if (accumulate != 1)            /* request flush? */
430 >                                memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
431 >                        n *= sizeof(FVECT)*2;           /* send assignment */
432 >                        if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
433                                  error(SYSTEM, "pipe write error");
434 <                        kida[avail].running = ++lastray;
434 >                        kida[i].r1 = lastray+1;
435 >                        lastray += kida[i].nr = ninq;   /* mark as busy */
436 >                        ninq = 0;
437 >                        if (lastray < lastdone) {       /* RNUMBER wrapped? */
438 >                                while (next_child_nq(1) >= 0)
439 >                                        ;
440 >                                lastdone = lastray = 0;
441 >                        }
442                  }
443                  if (raysleft && !--raysleft)
444                          break;                          /* preemptive EOI */
445          }
446          while (next_child_nq(1) >= 0)           /* empty results queue */
447                  ;
448 <                                                /* output accumulated record */
449 <        if (accumulate <= 0 || account < accumulate) {
450 <                int     i;
451 <                if (account < accumulate) {
373 <                        error(WARNING, "partial accumulation in final record");
374 <                        accumulate -= account;
375 <                }
376 <                for (i = 0; i < nmods; i++)
377 <                        mod_output(out_bq->mca[i]);
378 <                end_record();
448 >        if (account < accumulate) {
449 >                error(WARNING, "partial accumulation in final record");
450 >                free_binq(out_bq);              /* XXX just ignore it */
451 >                out_bq = NULL;
452          }
453 +        free_binq(NULL);                        /* clean up */
454 +        lu_done(&ofiletab);
455          if (raysleft)
456                  error(USER, "unexpected EOF on input");
457 <        free_binq(NULL);                        /* clean up */
457 > }
458 >
459 >
460 > /* Wait for the next available child by monitoring "to" pipes */
461 > static int
462 > next_child_ready()
463 > {
464 >        fd_set                  writeset, errset;
465 >        int                     i, n, nqr;
466 >
467 >        for (i = nchild; i--; )         /* see if there's one free first */
468 >                if (!kida[i].nr)
469 >                        return(i);
470 >                                        /* prepare select() call */
471 >        FD_ZERO(&writeset); FD_ZERO(&errset);
472 >        n = 0;
473 >        for (i = nchild; i--; ) {
474 >                FD_SET(kida[i].pr.w, &writeset);
475 >                FD_SET(kida[i].pr.r, &errset);
476 >                if (kida[i].pr.w >= n)
477 >                        n = kida[i].pr.w + 1;
478 >                if (kida[i].pr.r >= n)
479 >                        n = kida[i].pr.r + 1;
480 >        }
481 >        errno = 0;
482 >        n = select(n, NULL, &writeset, &errset, NULL);
483 >        if (n < 0)
484 >                error(SYSTEM, "select() error in next_child_ready()");
485 >        n = -1;                         /* identify waiting child */
486 >        for (i = nchild; i--; ) {
487 >                if (FD_ISSET(kida[i].pr.r, &errset))
488 >                        error(USER, "rendering process died");
489 >                if (FD_ISSET(kida[i].pr.w, &writeset))
490 >                        kida[n = i].nr = 0;
491 >        }
492 >        return(n);                      /* first available child */
493 > }
494 >
495 >
496 > /* Modified parental loop for full accumulation mode (-c 0) */
497 > void
498 > feeder_loop()
499 > {
500 >        static int      ignore_warning_given = 0;
501 >        int             ninq = 0;
502 >        FVECT           orgdir[2*MAXIQ];
503 >        int             i, n;
504 >                                        /* load rays from stdin & process */
505 > #ifdef getc_unlocked
506 >        flockfile(stdin);               /* avoid lock/unlock overhead */
507 > #endif
508 >        while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
509 >                if (orgdir[2*ninq+1][0] == 0.0 &&       /* asking for flush? */
510 >                                (orgdir[2*ninq+1][1] == 0.0) &
511 >                                (orgdir[2*ninq+1][2] == 0.0)) {
512 >                        if (!ignore_warning_given++)
513 >                                error(WARNING,
514 >                                "dummy ray(s) ignored during accumulation\n");
515 >                        continue;
516 >                }
517 >                if (++ninq >= MAXIQ) {
518 >                        i = next_child_ready();         /* get eager child */
519 >                        n = sizeof(FVECT)*2 * ninq;     /* give assignment */
520 >                        if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
521 >                                error(SYSTEM, "pipe write error");
522 >                        kida[i].r1 = lastray+1;
523 >                        lastray += kida[i].nr = ninq;
524 >                        ninq = 0;
525 >                        if (lastray < lastdone)         /* RNUMBER wrapped? */
526 >                                lastdone = lastray = 0;
527 >                }
528 >                if (raysleft && !--raysleft)
529 >                        break;                          /* preemptive EOI */
530 >        }
531 >        if (ninq) {                             /* polish off input */
532 >                i = next_child_ready();
533 >                n = sizeof(FVECT)*2 * ninq;
534 >                if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
535 >                        error(SYSTEM, "pipe write error");
536 >                kida[i].r1 = lastray+1;
537 >                lastray += kida[i].nr = ninq;
538 >                ninq = 0;
539 >        }
540 >        for (i = nchild; i--; ) {               /* get results */
541 >                close(kida[i].pr.w);
542 >                queue_results(i);
543 >        }
544 >        if (recover)                            /* and from before? */
545 >                queue_modifiers();
546 >        end_children();                         /* free up file descriptors */
547 >        for (i = 0; i < nmods; i++)
548 >                mod_output(out_bq->mca[i]);     /* output accumulated record */
549 >        end_record();
550 >        free_binq(out_bq);                      /* clean up */
551 >        out_bq = NULL;
552 >        free_binq(NULL);
553          lu_done(&ofiletab);
554 +        if (raysleft)
555 +                error(USER, "unexpected EOF on input");
556   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines