ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.15 by greg, Thu May 23 01:28:07 2024 UTC vs.
Revision 2.21 by greg, Tue Jun 4 21:47:55 2024 UTC

# Line 3 | Line 3 | static const char RCSid[] = "$Id$";
3   #endif
4   /*
5   * General component matrix combiner, operating on a row at a time.
6 + *
7 + * Multi-processing mode under Unix creates children that each work
8 + * on one input row at a time, fed by the original process.  Final conversion
9 + * and output to stdout is sorted by last child while its siblings send it
10 + * their record calculations.
11   */
12  
8 #include <signal.h>
13   #include <math.h>
14   #include "platform.h"
15   #include "rtprocess.h"
# Line 57 | Line 61 | int            cur_chan;                       /* if we're looping channels */
61   SUBPROC         *cproc = NULL;                  /* child process array */
62   int             nchildren = 0;                  /* # of child processes */
63   int             inchild = -1;                   /* our child ID (-1: parent) */
60 int             pgid = -1;                      /* process group ID */
61 int             nr_out = 0;                     /* # of rows output by kids */
64  
65   static int      checksymbolic(ROPMAT *rop);
66  
65 static void
66 on_sigio(int dummy)
67 {
68        nr_out++;                       /* happens when child outputs row */
69 }
70
67   static int
68   split_input(ROPMAT *rop)
69   {
# Line 550 | Line 546 | output_headinfo(FILE *fp)
546   }
547  
548   static int
549 + output_loop(void)
550 + {
551 +        const size_t    row_size = rmx_array_size(mop[nmats].rmp);
552 +        int             i = nmats;
553 +        int             cur_child = 0;
554 +
555 +        if (mop[nmats].rmp != &mop[nmats].imx)          /* output is split? */
556 +                rmx_reset(&mop[nmats].imx);
557 +        while (i-- > 0) {                               /* close input matrices */
558 +                fclose(mop[i].infp);            /* ! pclose() */
559 +                mop[i].infp = NULL;
560 +                rmx_reset(&mop[i].imx);
561 +                if (mop[i].rmp != &mop[i].imx) {
562 +                        rmx_free(mop[i].rmp);
563 +                        mop[i].rmp = &mop[i].imx;
564 +                }
565 +        }
566 + #ifdef getc_unlocked
567 +        flockfile(stdout);                              /* we own this, now */
568 + #endif
569 +        for ( ; ; ) {                                   /* loop until no more */
570 +                ssize_t         rv;
571 +                rv = readbuf(cproc[cur_child].r, mop[nmats].rmp->mtx, row_size);
572 +                if (!rv)                                /* out of rows? */
573 +                        break;
574 +                if (rv != row_size) {
575 +                        fputs("Read error in output loop\n", stderr);
576 +                        return(0);
577 +                }                                       /* do final conversion */
578 +                if (!rmx_write_data(mop[nmats].rmp->mtx, mop[nmats].rmp->ncomp,
579 +                                mop[nmats].rmp->ncols, mop[nmats].rmp->dtype, stdout)) {
580 +                        fputs("Conversion/write error in output loop\n", stderr);
581 +                        return(0);
582 +                }
583 +                cur_child++;
584 +                cur_child *= (cur_child < inchild);
585 +        }
586 +        return(fflush(stdout) != EOF);
587 + }
588 +
589 + static int
590   spawned_children(int np)
591   {
592          int     i, rv;
# Line 560 | Line 597 | spawned_children(int np)
597                  np = 1;
598          } else
599   #endif
600 <        if ((in_nrows > 0) & (np > in_nrows))
601 <                np = in_nrows;
600 >        if ((in_nrows > 0) & (np*4 > in_nrows))
601 >                np = in_nrows/4;
602                                  /* we'll be doing a row at a time */
603          for (i = 0; i < nmats; i++) {
604                  mop[i].imx.nrows = 1;
# Line 573 | Line 610 | spawned_children(int np)
610                                  goto memerror;
611                  }
612          }
613 <                                /* prep output row buffer */
613 >                                /* prep output row buffer(s) */
614          if (mcat || mop[nmats].preop.clen > 0) {
615                  if (!split_input(&mop[nmats]))  /* need separate buffer */
616                          return(0);
617                  if (mop[nmats].preop.clen > 0)
618                          mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
619                                                  mop[nmats].imx.ncomp;
583                mop[nmats].rmp->nrows = 1;
584                if (!mcat | !mcat_last && !rmx_prepare(mop[nmats].rmp))
585                        goto memerror;
620          }
621          mop[nmats].imx.nrows = 1;
622          if (!rmx_prepare(&mop[nmats].imx))
623                  goto memerror;
624 <        if (np <= 1) {          /* single process return point */
624 >        if (mop[nmats].rmp != &mop[nmats].imx) {
625 >                mop[nmats].rmp->nrows = 1;
626 >                if (!rmx_prepare(mop[nmats].rmp))
627 >                        goto memerror;
628 >        }
629 >        if (np <= 1) {          /* single process return */
630   #ifdef getc_unlocked
631                  for (i = 0; i < nmats; i++)
632                          flockfile(mop[i].infp);
# Line 595 | Line 634 | spawned_children(int np)
634   #endif
635                  return(0);
636          }
598        pgid = setpgrp();       /* set process group ID */
599        signal(SIGIO, on_sigio);
637          fflush(stdout);         /* flush header & spawn children */
638 <        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*np);
638 >        nchildren = np + 1;     /* extra child to sequence output */
639 >        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
640          if (!cproc)
641                  goto memerror;
642 <        nchildren = np;
643 <        for (i = 0; i < np; i++) {
644 <                cproc[i].flags = PF_FILT_OUT;
645 <                cproc[i].w = dup(1);
608 <                cproc[i].r = 0;
609 <                cproc[i].pid = -1;
642 >        for (i = nchildren; i--; ) cproc[i] = sp_inactive;
643 >        cproc[nchildren-1].flags |= PF_FILT_OUT;
644 >                                /* start each child */
645 >        for (i = 0; i < nchildren; i++) {
646                  rv = open_process(&cproc[i], NULL);
647                  if (rv <= 0) break;
648          }
613        if (rv > 0)
614                return(1);      /* parent return value */
649          if (rv < 0) {
650                  perror("fork");
651 +                close_processes(cproc, i);
652                  exit(1);
653          }
654 +        if (rv) {               /* are we the parent? */
655 +                i = nchildren-1;        /* last child is sole reader */
656 +                while (i-- > 0) {
657 +                        close(cproc[i].r);
658 +                        cproc[i].r = -1;
659 +                }
660 +                return(1);      /* parent return value */
661 +        }
662          inchild = i;            /* our child index */
663 <        while (i-- > 0)         /* don't share siblings' pipes */
663 >        while (i-- > 0)         /* only parent writes siblings */
664                  close(cproc[i].w);
665 <        fpurge(stdin);          /* discard previous matrix input */
666 < #ifdef getc_unlocked
667 <        flockfile(stdin);
668 < #endif
669 <        for (i = 0; i < nmats; i++) {
665 >
666 >        if (inchild == nchildren-1)     /* last child sequences output */
667 >                exit(output_loop() ? 0 : 1);
668 >
669 >        i = inchild;            /* won't read from siblings */
670 >        while (i-- > 0)
671 >                close(cproc[i].r);
672 >        i = nmats;              /* redirect input matrix streams */
673 >        while (i-- > 0) {
674                  if (mop[i].infp != stdin)
675                          fclose(mop[i].infp);    /* ! pclose() */
676                  mop[i].infp = stdin;
677 <                mop[i].imx.dtype = DTdouble;
677 >                mop[i].imx.dtype = DTrmx_native;
678 >                mop[i].imx.pflags &= ~RMF_SWAPIN;
679          }
680 <        return(0);              /* child return */
680 >        fpurge(stdin);          /* discard any previous matrix input */
681 > #ifdef getc_unlocked
682 >        flockfile(stdin);
683 > #endif
684 >        mop[nmats].rmp->dtype = DTrmx_native;
685 >        return(0);              /* worker child return value */
686   memerror:
687          fputs("Out of memory in spawned_children()\n", stderr);
688          exit(1);
689   }
690  
691   static int
692 < parent_loop()
692 > parent_loop(void)
693   {
641        FILE    **outfp = (FILE **)malloc(nchildren*sizeof(FILE *));
694          int     i;
695  
696 <        if (!outfp) goto memerror;
697 <        for (i = 0; i < nchildren; i++) {
698 <                outfp[i] = fdopen(cproc[i].w, "w");
699 <                if (!outfp[i]) goto memerror;
648 < #ifdef getc_unlocked
649 <                flockfile(outfp[i]);
650 < #endif
696 >        rmx_reset(&mop[nmats].imx);             /* not touching output side */
697 >        if (mop[nmats].rmp != &mop[nmats].imx) {
698 >                rmx_free(mop[nmats].rmp);
699 >                mop[nmats].rmp = &mop[nmats].imx;
700          }
701   #ifdef getc_unlocked
702 <        for (i = 0; i < nmats; i++)
702 >        for (i = 0; i < nmats; i++)             /* we handle matrix inputs */
703                  flockfile(mop[i].infp);
704   #endif
705 +                                                /* load & send rows to kids */
706          for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
707 <            FILE        *ofp = outfp[cur_row % nchildren];
707 >            int         wfd = cproc[cur_row % (nchildren-1)].w;
708              for (i = 0; i < nmats; i++)
709                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
710                          if (cur_row > in_nrows) /* unknown #input rows? */
711                                  break;
712 <                        fprintf(stderr, "%s: read error at row %d\n",
712 >                        fprintf(stderr, "%s: parent read error at row %d\n",
713                                          mop[i].inspec, cur_row);
714                          return(0);
715                  }
716              if (i < nmats)
717                  break;
718              for (i = 0; i < nmats; i++)
719 <                if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp,
720 <                                        mop[i].imx.ncols, DTdouble, ofp))
721 <                        return(0);
672 <            if (fflush(ofp) == EOF)
673 <                return(0);
719 >                if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx))
720 >                                        != rmx_array_size(&mop[i].imx))
721 >                        return(0);
722          }
675        for (i = 0; i < nchildren; i++)
676                fclose(outfp[i]);
677        free(outfp);
723          i = close_processes(cproc, nchildren);
724 <        free(cproc); cproc = NULL;
724 >        free(cproc); cproc = NULL; nchildren = 0;
725          if (i < 0) {
726 <                fputs("Warning: missing child in parent_loop()\n", stderr);
726 >                fputs("Warning: lost child in parent_loop()\n", stderr);
727                  return(1);
728          }
729          if (i > 0) {
730                  fprintf(stderr, "Child exited with status %d\n", i);
731                  return(0);
732          }
733 <        return(1);
733 >        return(1);                              /* return success! */
734   memerror:
735          fputs("Out of memory in parent_loop()\n", stderr);
736          exit(1);
737   }
738  
739   static int
740 < combine_input()
740 > combine_input(void)
741   {
742          const int       row0 = (inchild >= 0)*inchild;
743 <        const int       rstep = nchildren + !nchildren;
743 >        const int       rstep = nchildren ? nchildren-1 : 1;
744          ROPMAT          *res = &mop[nmats];
745          int             set_r, set_c;
746          RMATRIX         *tmp = NULL;
747          int             co_set;
703        sigset_t        iomask;
748          int             i;
749  
750          if (mcat && mcat_last &&
# Line 720 | Line 764 | combine_input()
764                  set_c = varlookup("c") != NULL && !vardefined("c");
765          } else                          /* save a little time */
766                  set_r = set_c = 0;
767 <
724 <        sigemptyset(&iomask);           /* read/process row-by-row */
725 <        sigaddset(&iomask, SIGIO);
767 >                                        /* read/process row-by-row */
768          for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) {
769              RMATRIX     *mres = NULL;
728            if (inchild >= 0) sigprocmask(SIG_BLOCK, &iomask, NULL);
770              for (i = 0; i < nmats; i++)
771                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
772                          if (cur_row > in_nrows) /* unknown #input rows? */
# Line 734 | Line 775 | combine_input()
775                                          mop[i].inspec, cur_row);
776                          return(0);
777                  }
737            if (inchild >= 0) sigprocmask(SIG_UNBLOCK, &iomask, NULL);
778              if (i < nmats)
779                  break;
780              for (i = 0; i < nmats; i++)
# Line 776 | Line 816 | combine_input()
816                          return(0);
817              }
818              rmx_free(mres); mres = NULL;
779            if (inchild >= 0) {         /* children share stdout */
780                while (nr_out < cur_row)
781                    pause();            /* wait for our turn */
782                sigprocmask(SIG_BLOCK, &iomask, NULL);
783            }
819              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
820                                  res->rmp->ncols, res->rmp->dtype, stdout))
821                  return(0);
822 <            if (inchild >= 0) {         /* flush and notify group */
823 <                if (fflush(stdout) == EOF)
789 <                    return(0);
790 <                sigprocmask(SIG_UNBLOCK, &iomask, NULL);
791 <                killpg(pgid, SIGIO);    /* increments everyone's nr_out */
792 <            }
822 >            if (inchild >= 0 && fflush(stdout) == EOF)
823 >                return(0);
824          }
825          return(inchild >= 0 || fflush(stdout) != EOF);
826   memerror:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines