ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.19 by greg, Mon Jun 3 18:55:51 2024 UTC vs.
Revision 2.20 by greg, Tue Jun 4 21:23:11 2024 UTC

# Line 6 | Line 6 | static const char RCSid[] = "$Id$";
6   *
7   * Multi-processing mode under Unix creates children that each work
8   * on one input row at a time, fed by the original process.  Final conversion
9 < * and output to stdout is sorted by first child while its siblings send it
9 > * and output to stdout is sorted by last child while its siblings send it
10   * their record calculations.
11   */
12  
# Line 545 | Line 545 | output_headinfo(FILE *fp)
545          }
546   }
547  
548 < static void
549 < reset_inputs(void)
548 > static int
549 > output_loop(void)
550   {
551 <        int     i = nmats;
551 >        const size_t    row_size = rmx_array_size(mop[nmats].rmp);
552 >        int             i = nmats;
553 >        int             cur_child = 0;
554  
555 <        fpurge(stdin);          /* discard previous matrix input */
556 <        while (i--) {
557 <                if (mop[i].infp != stdin)
558 <                        fclose(mop[i].infp);    /* ! pclose() */
559 <                mop[i].infp = stdin;
560 <                mop[i].imx.dtype = DTdouble;
561 <                mop[i].imx.pflags &= ~RMF_SWAPIN;
555 >        if (mop[nmats].rmp != &mop[nmats].imx)          /* output is split? */
556 >                rmx_reset(&mop[nmats].imx);
557 >        while (i-- > 0) {                               /* close input matrices */
558 >                fclose(mop[i].infp);            /* ! pclose() */
559 >                mop[i].infp = NULL;
560 >                rmx_reset(&mop[i].imx);
561 >                if (mop[i].rmp != &mop[i].imx) {
562 >                        rmx_free(mop[i].rmp);
563 >                        mop[i].rmp = &mop[i].imx;
564 >                }
565          }
566 + #ifdef getc_unlocked
567 +        flockfile(stdout);                              /* we own this, now */
568 + #endif
569 +        for ( ; ; ) {                                   /* loop until no more */
570 +                ssize_t         rv;
571 +                rv = readbuf(cproc[cur_child].r, mop[nmats].rmp->mtx, row_size);
572 +                if (!rv)                                /* out of rows? */
573 +                        break;
574 +                if (rv != row_size) {
575 +                        fputs("Read error in output loop\n", stderr);
576 +                        return(0);
577 +                }                                       /* do final conversion */
578 +                if (!rmx_write_data(mop[nmats].rmp->mtx, mop[nmats].rmp->ncomp,
579 +                                mop[nmats].rmp->ncols, mop[nmats].rmp->dtype, stdout)) {
580 +                        fputs("Conversion/write error in output loop\n", stderr);
581 +                        return(0);
582 +                }
583 +                cur_child++;
584 +                cur_child *= (cur_child < inchild);
585 +        }
586 +        return(fflush(stdout) != EOF);
587   }
588  
589   static int
590   spawned_children(int np)
591   {
566        size_t  recsize = 0;
592          int     i, rv;
593  
594   #if defined(_WIN32) || defined(_WIN64)
# Line 579 | Line 604 | spawned_children(int np)
604                  mop[i].imx.nrows = 1;
605                  if (!rmx_prepare(&mop[i].imx))
606                          goto memerror;
582                recsize += rmx_array_size(&mop[i].imx);
607                  if (mop[i].rmp != &mop[i].imx) {
608                          mop[i].rmp->nrows = 1;
609                          if (!rmx_prepare(mop[i].rmp))
610                                  goto memerror;
611                  }
612          }
613 <                                /* prep output row buffer */
613 >                                /* prep output row buffer(s) */
614          if (mcat || mop[nmats].preop.clen > 0) {
615                  if (!split_input(&mop[nmats]))  /* need separate buffer */
616                          return(0);
617                  if (mop[nmats].preop.clen > 0)
618                          mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
619                                                  mop[nmats].imx.ncomp;
596                mop[nmats].rmp->nrows = 1;
597                if (!mcat | !mcat_last && !rmx_prepare(mop[nmats].rmp))
598                        goto memerror;
620          }
621          mop[nmats].imx.nrows = 1;
622          if (!rmx_prepare(&mop[nmats].imx))
623                  goto memerror;
624 +        if (mop[nmats].rmp != &mop[nmats].imx) {
625 +                mop[nmats].rmp->nrows = 1;
626 +                if (!rmx_prepare(mop[nmats].rmp))
627 +                        goto memerror;
628 +        }
629          if (np <= 1) {          /* single process return */
630   #ifdef getc_unlocked
631                  for (i = 0; i < nmats; i++)
# Line 609 | Line 635 | spawned_children(int np)
635                  return(0);
636          }
637          fflush(stdout);         /* flush header & spawn children */
638 <        nchildren = np+1;
638 >        nchildren = np + 1;     /* extra child to sequence output */
639          cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
640          if (!cproc)
641                  goto memerror;
642 <                                /* first child becomes stdout filter */
643 <        cproc[0].flags = PF_FILT_OUT;
644 <        cproc[0].w = fileno(stdout);
645 <        cproc[0].r = 0;
620 <        cproc[0].pid = -1;
621 <        rv = open_process(&cproc[i], NULL);
622 <        if (rv < 0) {
623 <                perror("fork");
624 <                exit(1);
625 <        }
626 <        if (rv == 0) {          /* loop if first child */
627 <                inchild = 0;
628 <                reset_inputs();
629 <                output_loop();
630 <                _exit(0);
631 <        }
632 <                                /* start other children */
633 <        for (i = 1; (i < nchildren) & (rv > 0); i++) {
634 <                cproc[i].flags = 0;
635 <                cproc[i].w = 1;
636 <                cproc[i].r = 0;
637 <                cproc[i].pid = -1;
642 >        for (i = nchildren; i--; ) cproc[i] = sp_inactive;
643 >        cproc[nchildren-1].flags |= PF_FILT_OUT;
644 >                                /* start each child */
645 >        for (i = 0; i < nchildren; i++) {
646                  rv = open_process(&cproc[i], NULL);
647 +                if (rv <= 0) break;
648          }
649          if (rv < 0) {
650                  perror("fork");
651                  close_processes(cproc, i);
652                  exit(1);
653          }
654 <        if (rv > 0)             /* parent return? */
655 <                return(1);
654 >        if (rv) {               /* are we the parent? */
655 >                i = nchildren-1;        /* last child is sole reader */
656 >                while (i-- > 0)
657 >                        close(cproc[i].r);
658 >                return(1);      /* parent return value */
659 >        }
660          inchild = i;            /* our child index */
661 <        while (--i > 0)         /* don't share siblings' pipes */
661 >        while (i-- > 0)         /* only parent writes siblings */
662                  close(cproc[i].w);
663 <        reset_inputs();
663 >
664 >        if (inchild == nchildren-1)     /* last child sequences output */
665 >                exit(output_loop() ? 0 : 1);
666 >
667 >        i = inchild;            /* won't read from siblings */
668 >        while (i-- > 0)
669 >                close(cproc[i].r);
670 >        i = nmats;              /* close input matrix streams */
671 >        while (i-- > 0) {
672 >                if (mop[i].infp != stdin)
673 >                        fclose(mop[i].infp);    /* ! pclose() */
674 >                mop[i].infp = stdin;
675 >                mop[i].imx.dtype = DTrmx_native;
676 >                mop[i].imx.pflags &= ~RMF_SWAPIN;
677 >        }
678 >        fpurge(stdin);          /* discard any previous matrix input */
679   #ifdef getc_unlocked
680          flockfile(stdin);
681   #endif
682 <        return(0);              /* child return */
682 >        mop[nmats].rmp->dtype = DTrmx_native;
683 >        return(0);              /* worker child return value */
684   memerror:
685          fputs("Out of memory in spawned_children()\n", stderr);
686          exit(1);
# Line 662 | Line 691 | parent_loop(void)
691   {
692          int     i;
693  
694 <        rmx_reset(&mop[nmats].imx);             /* not doing output side */
695 <        if (mop[nmats].rmp != &mop[nmats].imx)
696 <                rmx_reset(mop[nmats].rmp);
694 >        rmx_reset(&mop[nmats].imx);             /* not touching output side */
695 >        if (mop[nmats].rmp != &mop[nmats].imx) {
696 >                rmx_free(mop[nmats].rmp);
697 >                mop[nmats].rmp = &mop[nmats].imx;
698 >        }
699   #ifdef getc_unlocked
700 <        for (i = 0; i < nmats; i++)
700 >        for (i = 0; i < nmats; i++)             /* we handle matrix inputs */
701                  flockfile(mop[i].infp);
702   #endif
703                                                  /* load & send rows to kids */
704          for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
705 <            int wfd = cproc[1 + cur_row%(nchildren-1)].w;
705 >            int         wfd = cproc[cur_row % (nchildren-1)].w;
706              for (i = 0; i < nmats; i++)
707                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
708                          if (cur_row > in_nrows) /* unknown #input rows? */
709                                  break;
710 <                        fprintf(stderr, "%s: read error at row %d\n",
710 >                        fprintf(stderr, "%s: parent read error at row %d\n",
711                                          mop[i].inspec, cur_row);
712                          return(0);
713                  }
# Line 688 | Line 719 | parent_loop(void)
719                          return(0);
720          }
721          i = close_processes(cproc, nchildren);
722 <        free(cproc); cproc = NULL;
722 >        free(cproc); cproc = NULL; nchildren = 0;
723          if (i < 0) {
724 <                fputs("Warning: missing child in parent_loop()\n", stderr);
724 >                fputs("Warning: lost child in parent_loop()\n", stderr);
725                  return(1);
726          }
727          if (i > 0) {
728                  fprintf(stderr, "Child exited with status %d\n", i);
729                  return(0);
730          }
731 <        return(1);
731 >        return(1);                              /* return success! */
732   memerror:
733          fputs("Out of memory in parent_loop()\n", stderr);
734          exit(1);
# Line 706 | Line 737 | memerror:
737   static int
738   combine_input(void)
739   {
740 <        const int       row0 = (inchild > 0)*(inchild-1);
741 <        const int       rstep = nchildren + !nchildren;
740 >        const int       row0 = (inchild >= 0)*inchild;
741 >        const int       rstep = nchildren ? nchildren-1 : 1;
742          ROPMAT          *res = &mop[nmats];
743          int             set_r, set_c;
744          RMATRIX         *tmp = NULL;
# Line 783 | Line 814 | combine_input(void)
814                          return(0);
815              }
816              rmx_free(mres); mres = NULL;
786            if (inchild > 0) {          /* children share stdout */
787                i = getc(stdin);        /* signals it's our turn */
788                if (i != EOF) ungetc(i, stdin);
789            }
817              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
818                                  res->rmp->ncols, res->rmp->dtype, stdout))
819                  return(0);
820 <            if (inchild > 0 && fflush(stdout) == EOF)
820 >            if (inchild >= 0 && fflush(stdout) == EOF)
821                  return(0);
822          }
823 <        return(inchild > 0 || fflush(stdout) != EOF);
823 >        return(inchild >= 0 || fflush(stdout) != EOF);
824   memerror:
825          fputs("Out of buffer space in combine_input()\n", stderr);
826          return(0);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines