ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/rc3.c
(Generate patch)

Comparing ray/src/rt/rc3.c (file contents):
Revision 2.1 by greg, Sat Jun 9 07:16:47 2012 UTC vs.
Revision 2.20 by greg, Sun Aug 23 00:17:12 2015 UTC

# Line 6 | Line 6 | static const char RCSid[] = "$Id$";
6   * Controlling process for multiple children
7   */
8  
9 + #include <signal.h>
10   #include "rcontrib.h"
10 #include "platform.h"
11 #include "rtprocess.h"
11   #include "selcall.h"
12  
13 + #define MAXIQ           (int)(PIPE_BUF/(sizeof(FVECT)*2))
14 +
15   /* Modifier contribution queue (results waiting to be output) */
16   typedef struct s_binq {
17 <        int             ndx;            /* index for this entry */
18 <        int             n2add;          /* number left to add */
17 >        RNUMBER         ndx;            /* index for this entry */
18 >        RNUMBER         nadded;         /* accumulated so far */
19          struct s_binq   *next;          /* next in queue */
20          MODCONT         *mca[1];        /* contrib. array (extends struct) */
21   } BINQ;
# Line 22 | Line 23 | typedef struct s_binq {
23   static BINQ     *out_bq = NULL;         /* output bin queue */
24   static BINQ     *free_bq = NULL;        /* free queue entries */
25  
26 < static SUBPROC  kida[MAXPROCESS];       /* child processes */
26 > static struct {
27 >        RNUMBER r1;                     /* assigned ray starting index */
28 >        SUBPROC pr;                     /* PID, i/o descriptors */
29 >        FILE    *infp;                  /* file pointer to read from process */
30 >        int     nr;                     /* number of rays to sum (0 if free) */
31 > } kida[MAXPROCESS];             /* our child processes */
32  
33  
34 < /* Get new (empty) bin queue entry */
34 > /* Get new bin queue entry */
35   static BINQ *
36   new_binq()
37   {
38 <        BINQ    *bp = free_bq;
38 >        BINQ    *bp;
39          int     i;
40  
41 <        if (bp != NULL) {               /* something already available? */
41 >        if (free_bq != NULL) {          /* something already available? */
42 >                bp = free_bq;
43                  free_bq = bp->next;
44                  bp->next = NULL;
45 <                bp->n2add = accumulate-1;
45 >                bp->nadded = 0;
46                  return(bp);
47          }
48                                          /* else allocate fresh */
49 <        bp = (BINQ *)malloc(sizeof(BINQ)+(nmods-1)*sizeof(MODCONT *));
49 >        bp = (BINQ *)malloc(sizeof(BINQ) + sizeof(MODCONT *)*(nmods-1));
50          if (bp == NULL)
51                  goto memerr;
52          for (i = nmods; i--; ) {
# Line 52 | Line 59 | new_binq()
59                  /* memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*mp->nbins); */
60          }
61          bp->ndx = 0;
62 <        bp->n2add = accumulate-1;
62 >        bp->nadded = 0;
63          bp->next = NULL;
64          return(bp);
65   memerr:
# Line 88 | Line 95 | free_binq(BINQ *bp)
95  
96  
97   /* Add modifier values to accumulation record in queue and clear */
98 < void
98 > static void
99   queue_modifiers()
100   {
101          MODCONT *mpin, *mpout;
# Line 104 | Line 111 | queue_modifiers()
111                          addcolor(mpout->cbin[j], mpin->cbin[j]);
112                  memset(mpin->cbin, 0, sizeof(DCOLOR)*mpin->nbins);
113          }
114 +        out_bq->nadded++;
115   }
116  
117  
118 < /* Sum one modifier record into another (doesn't update n2add) */
118 > /* Sum one modifier record into another (updates nadded) */
119   static void
120   add_modbin(BINQ *dst, BINQ *src)
121   {
# Line 119 | Line 127 | add_modbin(BINQ *dst, BINQ *src)
127                  for (j = mpout->nbins; j--; )
128                          addcolor(mpout->cbin[j], mpin->cbin[j]);
129          }
130 +        dst->nadded += src->nadded;
131   }
132  
133  
134 < /* Queue output, catching up with and freeing FIFO entries when possible */
135 < static int
134 > /* Queue values for later output */
135 > static void
136   queue_output(BINQ *bp)
137   {
129        int     nout = 0;
138          BINQ    *b_last, *b_cur;
131        int     i;
139  
140          if (accumulate <= 0) {          /* just accumulating? */
141                  if (out_bq == NULL) {
# Line 138 | Line 145 | queue_output(BINQ *bp)
145                          add_modbin(out_bq, bp);
146                          free_binq(bp);
147                  }
148 <                return(0);
148 >                return;
149          }
150 <        b_last = NULL;                  /* else insert in output queue */
150 >        b_last = NULL;                  /* insert in output queue */
151          for (b_cur = out_bq; b_cur != NULL && b_cur->ndx < bp->ndx;
152                                  b_cur = b_cur->next)
153                  b_last = b_cur;
154 +
155          if (b_last != NULL) {
156                  bp->next = b_cur;
157                  b_last->next = bp;
# Line 151 | Line 159 | queue_output(BINQ *bp)
159                  bp->next = out_bq;
160                  out_bq = bp;
161          }
162 <        if (accumulate > 1) {           /* merge accumulation entries */
163 <                b_cur = out_bq;
164 <                while (b_cur->next != NULL) {
165 <                        if (b_cur->n2add <= 0 ||
166 <                                        (b_cur->ndx-1)/accumulate !=
167 <                                        (b_cur->next->ndx-1)/accumulate) {
168 <                                b_cur = b_cur->next;
169 <                                continue;
170 <                        }
163 <                        add_modbin(b_cur, b_cur->next);
164 <                        b_cur->n2add--;
165 <                        b_last = b_cur->next;
166 <                        b_cur->next = b_last->next;
167 <                        b_last->next = NULL;
168 <                        free_binq(b_last);
162 >        if (accumulate == 1)            /* no accumulation? */
163 >                return;
164 >        b_cur = out_bq;                 /* else merge accumulation entries */
165 >        while (b_cur->next != NULL) {
166 >                if (b_cur->nadded >= accumulate ||
167 >                                (b_cur->ndx-1)/accumulate !=
168 >                                (b_cur->next->ndx-1)/accumulate) {
169 >                        b_cur = b_cur->next;
170 >                        continue;
171                  }
172 +                add_modbin(b_cur, b_cur->next);
173 +                b_last = b_cur->next;
174 +                b_cur->next = b_last->next;
175 +                b_last->next = NULL;
176 +                free_binq(b_last);
177          }
178 + }
179 +
180 +
181 + /* Count number of records ready for output */
182 + static int
183 + queue_ready()
184 + {
185 +        int     nready = 0;
186 +        BINQ    *bp;
187 +
188 +        for (bp = out_bq; bp != NULL && bp->nadded >= accumulate &&
189 +                                bp->ndx == lastdone+nready*accumulate+1;
190 +                                bp = bp->next)
191 +                ++nready;
192 +
193 +        return(nready);
194 + }
195 +
196 +
197 + /* Catch up with output queue by producing ready results */
198 + static int
199 + output_catchup(int nmax)
200 + {
201 +        int     nout = 0;
202 +        BINQ    *bp;
203 +        int     i;
204                                          /* output ready results */
205 <        while (out_bq != NULL && (out_bq->ndx == lastdone+1) & !out_bq->n2add) {
206 <                b_cur = out_bq;                 /* pop off first entry */
207 <                out_bq = b_cur->next;
208 <                b_cur->next = NULL;
205 >        while (out_bq != NULL && out_bq->nadded >= accumulate
206 >                                && out_bq->ndx == lastdone+1) {
207 >                if ((nmax > 0) & (nout >= nmax))
208 >                        break;
209 >                bp = out_bq;                    /* pop off first entry */
210 >                out_bq = bp->next;
211 >                bp->next = NULL;
212                  for (i = 0; i < nmods; i++)     /* output record */
213 <                        mod_output(b_cur->mca[i]);
213 >                        mod_output(bp->mca[i]);
214                  end_record();
215 <                free_binq(b_cur);               /* free this entry */
215 >                free_binq(bp);                  /* free this entry */
216                  lastdone += accumulate;
217                  ++nout;
218          }
# Line 184 | Line 220 | queue_output(BINQ *bp)
220   }
221  
222  
223 < /* Put a zero record in results queue */
223 > /* Put a zero record in results queue & output */
224   void
225 < zero_record(int ndx)
225 > put_zero_record(int ndx)
226   {
227          BINQ    *bp = new_binq();
228          int     i;
# Line 194 | Line 230 | zero_record(int ndx)
230          for (i = nmods; i--; )
231                  memset(bp->mca[i]->cbin, 0, sizeof(DCOLOR)*bp->mca[i]->nbins);
232          bp->ndx = ndx;
233 +        bp->nadded = 1;
234          queue_output(bp);
235 +        output_catchup(0);
236   }
237  
238  
239 + /* Get results from child process and add to queue */
240 + static void
241 + queue_results(int k)
242 + {
243 +        BINQ    *bq = new_binq();       /* get results holder */
244 +        int     j;
245 +
246 +        bq->ndx = kida[k].r1;
247 +        bq->nadded = kida[k].nr;
248 +                                        /* read from child */
249 +        for (j = 0; j < nmods; j++)
250 +                if (fread(bq->mca[j]->cbin, sizeof(DCOLOR), bq->mca[j]->nbins,
251 +                                        kida[k].infp) != bq->mca[j]->nbins)
252 +                        error(SYSTEM, "read error from render process");
253 +                        
254 +        queue_output(bq);               /* put results in output queue */
255 +        kida[k].nr = 0;                 /* mark child as available */
256 + }
257 +
258 +
259   /* callback to set output spec to NULL (stdout) */
260   static int
261   set_stdout(const LUENT *le, void *p)
# Line 207 | Line 265 | set_stdout(const LUENT *le, void *p)
265   }
266  
267  
268 < /* Start child processes if we can */
268 > /* Start child processes if we can (call only once in parent!) */
269   int
270   in_rchild()
271   {
272 < #ifdef _WIN32
273 <        error(WARNING, "multiprocessing unsupported -- running solo");
274 <        nproc = 1;
275 <        return(1);
276 < #else
277 <                                        /* try to fork ourselves */
278 <        while (nchild < nproc) {
279 <                int     p0[2], p1[2];
280 <                int     pid;
281 <                                        /* prepare i/o pipes */
282 <                if (pipe(p0) < 0 || pipe(p1) < 0)
283 <                        error(SYSTEM, "pipe() call failed!");
284 <                pid = fork();           /* fork parent process */
285 <                if (pid == 0) {         /* if in child, set up & return true */
228 <                        close(p0[1]); close(p1[0]);
229 <                        dup2(p0[0], 0); close(p0[0]);
230 <                        dup2(p1[1], 1); close(p1[1]);
272 >        int     rval;
273 >
274 >        while (nchild < nproc) {        /* fork until target reached */
275 >                errno = 0;
276 >                rval = open_process(&kida[nchild].pr, NULL);
277 >                if (rval < 0)
278 >                        error(SYSTEM, "open_process() call failed");
279 >                if (rval == 0) {        /* if in child, set up & return true */
280 >                        lu_doall(&modconttab, &set_stdout, NULL);
281 >                        lu_done(&ofiletab);
282 >                        while (nchild--) {      /* don't share other pipes */
283 >                                close(kida[nchild].pr.w);
284 >                                fclose(kida[nchild].infp);
285 >                        }
286                          inpfmt = (sizeof(RREAL)==sizeof(double)) ? 'd' : 'f';
287                          outfmt = 'd';
288                          header = 0;
234                        waitflush = xres = 1;
289                          yres = 0;
290                          raysleft = 0;
291 <                        account = accumulate = 1;
292 <                        lu_doall(&modconttab, set_stdout, NULL);
293 <                        nchild = -1;
294 <                        return(1);      /* child return value */
291 >                        if (accumulate == 1) {
292 >                                waitflush = xres = 1;
293 >                                account = accumulate = 1;
294 >                        } else {        /* parent controls accumulation */
295 >                                waitflush = xres = 0;
296 >                                account = accumulate = 0;
297 >                        }
298 >                        return(1);      /* return "true" in child */
299                  }
300 <                if (pid < 0)
301 <                        error(SYSTEM, "fork() call failed!");
302 <                                        /* connect our pipes */
303 <                close(p0[0]); close(p1[1]);
304 <                kida[nchild].r = p1[0];
305 <                kida[nchild].w = p0[1];
306 <                kida[nchild].pid = pid;
249 <                kida[nchild].running = -1;
250 <                ++nchild;
300 >                if (rval != PIPE_BUF)
301 >                        error(CONSISTENCY, "bad value from open_process()");
302 >                                        /* connect to child's output */
303 >                kida[nchild].infp = fdopen(kida[nchild].pr.r, "rb");
304 >                if (kida[nchild].infp == NULL)
305 >                        error(SYSTEM, "out of memory in in_rchild()");
306 >                kida[nchild++].nr = 0;  /* mark as available */
307          }
308 <        return(0);                      /* parent return value */
308 > #ifdef getc_unlocked
309 >        for (rval = nchild; rval--; )   /* avoid mutex overhead */
310 >                flockfile(kida[rval].infp);
311   #endif
312 +        return(0);                      /* return "false" in parent */
313   }
314  
315  
316   /* Close child processes */
317   void
318 < end_children()
318 > end_children(int immed)
319   {
320          int     status;
321          
322 <        while (nchild-- > 0)
323 <                if ((status = close_process(&kida[nchild])) > 0) {
322 >        while (nchild > 0) {
323 >                nchild--;
324 > #ifdef SIGKILL
325 >                if (immed)              /* error mode -- quick exit */
326 >                        kill(kida[nchild].pr.pid, SIGKILL);
327 > #endif
328 >                if ((status = close_process(&kida[nchild].pr)) > 0 && !immed) {
329                          sprintf(errmsg,
330                                  "rendering process returned bad status (%d)",
331                                          status);
332                          error(WARNING, errmsg);
333                  }
334 +                fclose(kida[nchild].infp);
335 +        }
336   }
337  
338  
339 < /* Wait for the next available child */
339 > /* Wait for the next available child, managing output queue simultaneously */
340   static int
341 < next_child_nq(int force_wait)
341 > next_child_nq(int flushing)
342   {
343 <        fd_set  readset, errset;
344 <        int     i, j, n, nr;
343 >        static struct timeval   polling;
344 >        struct timeval          *pmode;
345 >        fd_set                  readset, errset;
346 >        int                     i, n, nr, nqr;
347  
348 <        if (!force_wait)                /* see if there's one free */
348 >        if (!flushing)                  /* see if there's one free */
349                  for (i = nchild; i--; )
350 <                        if (kida[i].running < 0)
350 >                        if (!kida[i].nr)
351                                  return(i);
352 +
353 +        nqr = queue_ready();            /* choose blocking mode or polling */
354 +        if ((nqr > 0) & !flushing)
355 +                pmode = &polling;
356 +        else
357 +                pmode = NULL;
358 + tryagain:                               /* catch up with output? */
359 +        if (pmode == &polling) {
360 +                if (nqr > nchild)       /* don't get too far behind */
361 +                        nqr -= output_catchup(nqr-nchild);
362 +        } else if (nqr > 0)             /* clear output before blocking */
363 +                nqr -= output_catchup(0);
364                                          /* prepare select() call */
365          FD_ZERO(&readset); FD_ZERO(&errset);
366          n = nr = 0;
367          for (i = nchild; i--; ) {
368 <                if (kida[i].running > 0) {
369 <                        FD_SET(kida[i].r, &readset);
368 >                if (kida[i].nr) {
369 >                        FD_SET(kida[i].pr.r, &readset);
370                          ++nr;
371                  }
372 <                FD_SET(kida[i].r, &errset);
373 <                if (kida[i].r >= n)
374 <                        n = kida[i].r + 1;
372 >                FD_SET(kida[i].pr.r, &errset);
373 >                if (kida[i].pr.r >= n)
374 >                        n = kida[i].pr.r + 1;
375          }
376 <        if (!nr)                        /* nothing going on */
376 >        if (!nr)                        /* nothing to wait for? */
377                  return(-1);
378 <        if (nr > 1) {                   /* call select if multiple busy */
378 >        if ((nr > 1) | (pmode == &polling)) {
379                  errno = 0;
380 <                if (select(n, &readset, NULL, &errset, NULL) < 0)
381 <                        error(SYSTEM, "select call error in next_child_nq()");
380 >                nr = select(n, &readset, NULL, &errset, pmode);
381 >                if (!nr) {
382 >                        pmode = NULL;   /* try again, blocking this time */
383 >                        goto tryagain;
384 >                }
385 >                if (nr < 0)
386 >                        error(SYSTEM, "select() error in next_child_nq()");
387          } else
388                  FD_ZERO(&errset);
389          n = -1;                         /* read results from child(ren) */
390          for (i = nchild; i--; ) {
391 <                BINQ    *bq;
307 <                if (FD_ISSET(kida[i].r, &errset))
391 >                if (FD_ISSET(kida[i].pr.r, &errset))
392                          error(USER, "rendering process died");
393 <                if (!FD_ISSET(kida[i].r, &readset))
394 <                        continue;
311 <                bq = new_binq();        /* get results holder */
312 <                bq->ndx = kida[i].running;
313 <                                        /* read from child */
314 <                for (j = 0; j < nmods; j++) {
315 <                        n = sizeof(DCOLOR)*bq->mca[j]->nbins;
316 <                        nr = readbuf(kida[i].r, (char *)bq->mca[j]->cbin, n);
317 <                        if (nr != n)
318 <                                error(SYSTEM, "read error from render process");
319 <                }
320 <                queue_output(bq);       /* put results in output queue */
321 <                kida[i].running = -1;   /* mark child as available */
322 <                n = i;
393 >                if (FD_ISSET(kida[i].pr.r, &readset))
394 >                        queue_results(n = i);
395          }
396 <        return(n);                      /* last available child */
396 >        return(n);                      /* first available child */
397   }
398  
399  
# Line 329 | Line 401 | next_child_nq(int force_wait)
401   void
402   parental_loop()
403   {
404 +        const int       qlimit = (accumulate == 1) ? 1 : MAXIQ-1;
405 +        int             ninq = 0;
406 +        FVECT           orgdir[2*MAXIQ];
407 +        int             i, n;
408 +                                        /* load rays from stdin & process */
409 + #ifdef getc_unlocked
410 +        flockfile(stdin);               /* avoid lock/unlock overhead */
411 + #endif
412 +        while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
413 +                const int       zero_ray = orgdir[2*ninq+1][0] == 0.0 &&
414 +                                                (orgdir[2*ninq+1][1] == 0.0) &
415 +                                                (orgdir[2*ninq+1][2] == 0.0);
416 +                ninq += !zero_ray;
417 +                                /* Zero ray cannot go in input queue */
418 +                if (zero_ray ? ninq : ninq >= qlimit ||
419 +                            lastray/accumulate != (lastray+ninq)/accumulate) {
420 +                        i = next_child_nq(0);           /* manages output */
421 +                        n = ninq;
422 +                        if (accumulate > 1)             /* need terminator? */
423 +                                memset(orgdir[2*n++], 0, sizeof(FVECT)*2);
424 +                        n *= sizeof(FVECT)*2;           /* send assignment */
425 +                        if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
426 +                                error(SYSTEM, "pipe write error");
427 +                        kida[i].r1 = lastray+1;
428 +                        lastray += kida[i].nr = ninq;   /* mark as busy */
429 +                        if (lastray < lastdone) {       /* RNUMBER wrapped? */
430 +                                while (next_child_nq(1) >= 0)
431 +                                        ;
432 +                                lastray -= ninq;
433 +                                lastdone = lastray %= accumulate;
434 +                        }
435 +                        ninq = 0;
436 +                }
437 +                if (zero_ray) {                         /* put bogus record? */
438 +                        if ((yres <= 0) | (xres <= 1) &&
439 +                                        (lastray+1) % accumulate == 0) {
440 +                                while (next_child_nq(1) >= 0)
441 +                                        ;               /* clear the queue */
442 +                                lastdone = lastray = accumulate-1;
443 +                                waitflush = 1;          /* flush next */
444 +                        }
445 +                        put_zero_record(++lastray);
446 +                }
447 +                if (raysleft && !--raysleft)
448 +                        break;                          /* preemptive EOI */
449 +        }
450 +        while (next_child_nq(1) >= 0)           /* empty results queue */
451 +                ;
452 +        if (account < accumulate) {
453 +                error(WARNING, "partial accumulation in final record");
454 +                free_binq(out_bq);              /* XXX just ignore it */
455 +                out_bq = NULL;
456 +        }
457 +        free_binq(NULL);                        /* clean up */
458 +        lu_done(&ofiletab);
459 +        if (raysleft)
460 +                error(USER, "unexpected EOF on input");
461 + }
462 +
463 +
464 + /* Wait for the next available child by monitoring "to" pipes */
465 + static int
466 + next_child_ready()
467 + {
468 +        fd_set                  writeset, errset;
469 +        int                     i, n;
470 +
471 +        for (i = nchild; i--; )         /* see if there's one free first */
472 +                if (!kida[i].nr)
473 +                        return(i);
474 +                                        /* prepare select() call */
475 +        FD_ZERO(&writeset); FD_ZERO(&errset);
476 +        n = 0;
477 +        for (i = nchild; i--; ) {
478 +                FD_SET(kida[i].pr.w, &writeset);
479 +                FD_SET(kida[i].pr.r, &errset);
480 +                if (kida[i].pr.w >= n)
481 +                        n = kida[i].pr.w + 1;
482 +                if (kida[i].pr.r >= n)
483 +                        n = kida[i].pr.r + 1;
484 +        }
485 +        errno = 0;
486 +        n = select(n, NULL, &writeset, &errset, NULL);
487 +        if (n < 0)
488 +                error(SYSTEM, "select() error in next_child_ready()");
489 +        n = -1;                         /* identify waiting child */
490 +        for (i = nchild; i--; ) {
491 +                if (FD_ISSET(kida[i].pr.r, &errset))
492 +                        error(USER, "rendering process died");
493 +                if (FD_ISSET(kida[i].pr.w, &writeset))
494 +                        kida[n = i].nr = 0;
495 +        }
496 +        return(n);                      /* first available child */
497 + }
498 +
499 +
500 + /* Modified parental loop for full accumulation mode (-c 0) */
501 + void
502 + feeder_loop()
503 + {
504          static int      ignore_warning_given = 0;
505 <        FVECT           orgdir[2];
506 <        double          d;
505 >        int             ninq = 0;
506 >        FVECT           orgdir[2*MAXIQ];
507 >        int             i, n;
508                                          /* load rays from stdin & process */
509   #ifdef getc_unlocked
510          flockfile(stdin);               /* avoid lock/unlock overhead */
511   #endif
512 <        while (getvec(orgdir[0]) == 0 && getvec(orgdir[1]) == 0) {
513 <                d = normalize(orgdir[1]);
514 <                                                        /* asking for flush? */
515 <                if ((d == 0.0) & (accumulate != 1)) {
512 >        while (getvec(orgdir[2*ninq]) == 0 && getvec(orgdir[2*ninq+1]) == 0) {
513 >                if (orgdir[2*ninq+1][0] == 0.0 &&       /* asking for flush? */
514 >                                (orgdir[2*ninq+1][1] == 0.0) &
515 >                                (orgdir[2*ninq+1][2] == 0.0)) {
516                          if (!ignore_warning_given++)
517                                  error(WARNING,
518                                  "dummy ray(s) ignored during accumulation\n");
519                          continue;
520 <                }
521 <                if ((d == 0.0) | (lastray+1 < lastray)) {
522 <                        while (next_child_nq(1) >= 0)
523 <                                ;
524 <                        lastdone = lastray = 0;
352 <                }
353 <                if (d == 0.0) {
354 <                        if ((yres <= 0) | (xres <= 0))
355 <                                waitflush = 1;          /* flush right after */
356 <                        zero_record(++lastray);
357 <                } else {                                /* else assign */
358 <                        int     avail = next_child_nq(0);
359 <                        if (writebuf(kida[avail].w, (char *)orgdir,
360 <                                        sizeof(FVECT)*2) != sizeof(FVECT)*2)
520 >                }
521 >                if (++ninq >= MAXIQ) {
522 >                        i = next_child_ready();         /* get eager child */
523 >                        n = sizeof(FVECT)*2 * ninq;     /* give assignment */
524 >                        if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
525                                  error(SYSTEM, "pipe write error");
526 <                        kida[avail].running = ++lastray;
526 >                        kida[i].r1 = lastray+1;
527 >                        lastray += kida[i].nr = ninq;
528 >                        if (lastray < lastdone)         /* RNUMBER wrapped? */
529 >                                lastdone = lastray = 0;
530 >                        ninq = 0;
531                  }
532                  if (raysleft && !--raysleft)
533                          break;                          /* preemptive EOI */
534          }
535 <        while (next_child_nq(1) >= 0)           /* empty results queue */
536 <                ;
537 <                                                /* output accumulated record */
538 <        if (accumulate <= 0 || account < accumulate) {
539 <                int     i;
540 <                if (account < accumulate) {
541 <                        error(WARNING, "partial accumulation in final record");
542 <                        accumulate -= account;
375 <                }
376 <                for (i = 0; i < nmods; i++)
377 <                        mod_output(out_bq->mca[i]);
378 <                end_record();
535 >        if (ninq) {                             /* polish off input */
536 >                i = next_child_ready();
537 >                n = sizeof(FVECT)*2 * ninq;
538 >                if (writebuf(kida[i].pr.w, (char *)orgdir, n) != n)
539 >                        error(SYSTEM, "pipe write error");
540 >                kida[i].r1 = lastray+1;
541 >                lastray += kida[i].nr = ninq;
542 >                ninq = 0;
543          }
544 +        memset(orgdir, 0, sizeof(FVECT)*2);     /* get results */
545 +        for (i = nchild; i--; ) {
546 +                writebuf(kida[i].pr.w, (char *)orgdir, sizeof(FVECT)*2);
547 +                queue_results(i);
548 +        }
549 +        if (recover)                            /* and from before? */
550 +                queue_modifiers();
551 +        end_children(0);                        /* free up file descriptors */
552 +        for (i = 0; i < nmods; i++)
553 +                mod_output(out_bq->mca[i]);     /* output accumulated record */
554 +        end_record();
555 +        free_binq(out_bq);                      /* clean up */
556 +        out_bq = NULL;
557 +        free_binq(NULL);
558 +        lu_done(&ofiletab);
559          if (raysleft)
560                  error(USER, "unexpected EOF on input");
382        free_binq(NULL);                        /* clean up */
383        lu_done(&ofiletab);
561   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines