ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.14 by greg, Wed May 22 15:38:04 2024 UTC vs.
Revision 2.22 by greg, Tue Jun 4 22:05:23 2024 UTC

# Line 3 | Line 3 | static const char RCSid[] = "$Id$";
3   #endif
4   /*
5   * General component matrix combiner, operating on a row at a time.
6 + *
7 + * Multi-processing mode under Unix creates children that each work
8 + * on one input row at a time, fed by the original process.  Final conversion
9 + * and output to stdout is sorted by last child while its siblings send it
10 + * their record calculations.
11   */
12  
8 #include <signal.h>
13   #include <math.h>
14   #include "platform.h"
15   #include "rtprocess.h"
# Line 57 | Line 61 | int            cur_chan;                       /* if we're looping channels */
61   SUBPROC         *cproc = NULL;                  /* child process array */
62   int             nchildren = 0;                  /* # of child processes */
63   int             inchild = -1;                   /* our child ID (-1: parent) */
60 int             pgid = -1;                      /* process group ID */
61 int             nr_out = 0;                     /* # of rows output by kids */
64  
65 < static int      checksymbolic(ROPMAT *rop);
65 > extern int      checksymbolic(ROPMAT *rop);
66  
67 < static void
66 < on_sigio(int dummy)
67 < {
68 <        nr_out++;                       /* happens when child outputs row */
69 < }
70 <
71 < static int
67 > int
68   split_input(ROPMAT *rop)
69   {
70          if (rop->rmp == &rop->imx && !(rop->rmp = rmx_copy(&rop->imx))) {
# Line 80 | Line 76 | split_input(ROPMAT *rop)
76   }
77  
78   /* Check/set transform based on a reference input file */
79 < static int
79 > int
80   checkreffile(ROPMAT *rop)
81   {
82          static const char       *curRF = NULL;
# Line 143 | Line 139 | checkreffile(ROPMAT *rop)
139   }
140  
141   /* Compute conversion row from spectrum to one channel of RGB */
142 < static void
142 > void
143   rgbrow(ROPMAT *rop, int r, int p)
144   {
145          const int       nc = rop->imx.ncomp;
# Line 160 | Line 156 | rgbrow(ROPMAT *rop, int r, int p)
156   }
157  
158   /* Compute conversion row from spectrum to one channel of XYZ */
159 < static void
159 > void
160   xyzrow(ROPMAT *rop, int r, int p)
161   {
162          const int       nc = rop->imx.ncomp;
# Line 177 | Line 173 | xyzrow(ROPMAT *rop, int r, int p)
173   }
174  
175   /* Use the spectral sensitivity function to compute matrix coefficients */
176 < static void
176 > void
177   sensrow(ROPMAT *rop, int r, double (*sf)(SCOLOR sc, int ncs, const float wlpt[4]))
178   {
179          const int       nc = rop->imx.ncomp;
# Line 192 | Line 188 | sensrow(ROPMAT *rop, int r, double (*sf)(SCOLOR sc, in
188   }
189  
190   /* Check/set symbolic transform */
191 < static int
191 > int
192   checksymbolic(ROPMAT *rop)
193   {
194          const int       nc = rop->imx.ncomp;
# Line 306 | Line 302 | checksymbolic(ROPMAT *rop)
302          return(1);
303   }
304  
305 < static int
305 > int
306   get_component_xfm(ROPMAT *rop)
307   {
308          int     i, j;
# Line 389 | Line 385 | get_component_xfm(ROPMAT *rop)
385          return(1);
386   }
387  
388 < static int
388 > int
389   apply_op(RMATRIX *dst, const RMATRIX *src, const RUNARYOP *ro)
390   {
391          if (ro->clen > 0) {
# Line 408 | Line 404 | apply_op(RMATRIX *dst, const RMATRIX *src, const RUNAR
404          return(1);
405   }
406  
407 < static int
407 > int
408   open_input(ROPMAT *rop)
409   {
410          int     outtype;
# Line 430 | Line 426 | open_input(ROPMAT *rop)
426   }
427  
428   /* Return nominal wavelength associated with input component (return nm) */
429 < static double
429 > double
430   l_wavelength(char *nam)
431   {
432          double  comp = argument(1);
# Line 454 | Line 450 | l_wavelength(char *nam)
450   }
451  
452   /* Return ith input with optional channel selector */
453 < static double
453 > double
454   l_chanin(char *nam)
455   {
456          double  inp = argument(1);
# Line 479 | Line 475 | l_chanin(char *nam)
475          return(mop[mi].rmp->mtx[cur_col*in_ncomp + chan]);
476   }
477  
478 < static int
478 > int
479   initialize(RMATRIX *imp)
480   {
481          int     i;
# Line 526 | Line 522 | initialize(RMATRIX *imp)
522          return(1);
523   }
524  
525 < static void
525 > void
526   output_headinfo(FILE *fp)
527   {
528          int     i;
# Line 549 | Line 545 | output_headinfo(FILE *fp)
545          }
546   }
547  
548 < static int
548 > int
549   spawned_children(int np)
550   {
551          int     i, rv;
# Line 560 | Line 556 | spawned_children(int np)
556                  np = 1;
557          } else
558   #endif
559 <        if ((in_nrows > 0) & (np > in_nrows))
560 <                np = in_nrows;
559 >        if ((in_nrows > 0) & (np*4 > in_nrows))
560 >                np = in_nrows/4;
561                                  /* we'll be doing a row at a time */
562          for (i = 0; i < nmats; i++) {
563                  mop[i].imx.nrows = 1;
# Line 573 | Line 569 | spawned_children(int np)
569                                  goto memerror;
570                  }
571          }
572 <                                /* prep output row buffer */
572 >                                /* prep output row buffer(s) */
573          if (mcat || mop[nmats].preop.clen > 0) {
574                  if (!split_input(&mop[nmats]))  /* need separate buffer */
575                          return(0);
576                  if (mop[nmats].preop.clen > 0)
577                          mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
578                                                  mop[nmats].imx.ncomp;
583                mop[nmats].rmp->nrows = 1;
584                if (!mcat | !mcat_last && !rmx_prepare(mop[nmats].rmp))
585                        goto memerror;
579          }
580          mop[nmats].imx.nrows = 1;
581          if (!rmx_prepare(&mop[nmats].imx))
582                  goto memerror;
583 <        if (np <= 1) {          /* single process return point */
583 >        if (mop[nmats].rmp != &mop[nmats].imx) {
584 >                mop[nmats].rmp->nrows = 1;
585 >                if (!rmx_prepare(mop[nmats].rmp))
586 >                        goto memerror;
587 >        }
588 >        if (np <= 1) {          /* single process return */
589   #ifdef getc_unlocked
590                  for (i = 0; i < nmats; i++)
591                          flockfile(mop[i].infp);
# Line 595 | Line 593 | spawned_children(int np)
593   #endif
594                  return(0);
595          }
598        pgid = setpgrp();       /* set process group ID */
599        signal(SIGIO, on_sigio);
596          fflush(stdout);         /* flush header & spawn children */
597 <        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*np);
597 >        nchildren = np + 1;     /* extra child to sequence output */
598 >        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
599          if (!cproc)
600                  goto memerror;
601 <        nchildren = np;
602 <        for (i = 0; i < np; i++) {
603 <                cproc[i].flags = PF_FILT_OUT;
604 <                cproc[i].w = dup(1);
608 <                cproc[i].r = 0;
609 <                cproc[i].pid = -1;
601 >        for (i = nchildren; i--; ) cproc[i] = sp_inactive;
602 >        cproc[nchildren-1].flags |= PF_FILT_OUT;
603 >                                /* start each child */
604 >        for (i = 0; i < nchildren; i++) {
605                  rv = open_process(&cproc[i], NULL);
606                  if (rv <= 0) break;
607          }
613        if (rv > 0)
614                return(1);      /* parent return value */
608          if (rv < 0) {
609                  perror("fork");
610 +                close_processes(cproc, i);
611                  exit(1);
612          }
613 +        if (rv) {               /* are we the parent? */
614 +                i = nchildren-1;        /* last child is sole reader */
615 +                while (i-- > 0) {
616 +                        close(cproc[i].r);
617 +                        cproc[i].r = -1;
618 +                }
619 +                return(1);      /* parent return value */
620 +        }
621          inchild = i;            /* our child index */
622 <        while (i-- > 0)         /* don't share siblings' pipes */
622 >        while (i-- > 0)         /* only parent writes siblings */
623                  close(cproc[i].w);
624 <        fpurge(stdin);          /* discard previous matrix input */
625 < #ifdef getc_unlocked
626 <        flockfile(stdin);
627 < #endif
628 <        for (i = 0; i < nmats; i++) {
624 >
625 >        if (inchild == nchildren-1)
626 >                return(-1);     /* output process return value */
627 >
628 >        i = inchild;            /* won't read from siblings */
629 >        while (i-- > 0)
630 >                close(cproc[i].r);
631 >        i = nmats;              /* redirect input matrix streams */
632 >        while (i-- > 0) {
633                  if (mop[i].infp != stdin)
634                          fclose(mop[i].infp);    /* ! pclose() */
635                  mop[i].infp = stdin;
636 <                mop[i].imx.dtype = DTdouble;
636 >                mop[i].imx.dtype = DTrmx_native;
637 >                mop[i].imx.pflags &= ~RMF_SWAPIN;
638          }
639 <        return(0);              /* child return */
639 >        fpurge(stdin);          /* discard any previous matrix input */
640 > #ifdef getc_unlocked
641 >        flockfile(stdin);
642 > #endif
643 >        mop[nmats].rmp->dtype = DTrmx_native;
644 >        return(0);              /* worker child return value */
645   memerror:
646          fputs("Out of memory in spawned_children()\n", stderr);
647          exit(1);
648   }
649  
650 < static int
651 < parent_loop()
650 > int
651 > parent_loop(void)
652   {
641        FILE    **outfp = (FILE **)malloc(nchildren*sizeof(FILE *));
653          int     i;
654  
655 <        if (!outfp) goto memerror;
656 <        for (i = 0; i < nchildren; i++) {
657 <                outfp[i] = fdopen(cproc[i].w, "w");
658 <                if (!outfp[i]) goto memerror;
648 < #ifdef getc_unlocked
649 <                flockfile(outfp[i]);
650 < #endif
655 >        rmx_reset(&mop[nmats].imx);             /* not touching output side */
656 >        if (mop[nmats].rmp != &mop[nmats].imx) {
657 >                rmx_free(mop[nmats].rmp);
658 >                mop[nmats].rmp = &mop[nmats].imx;
659          }
660   #ifdef getc_unlocked
661 <        for (i = 0; i < nmats; i++)
661 >        for (i = 0; i < nmats; i++)             /* we handle matrix inputs */
662                  flockfile(mop[i].infp);
663   #endif
664 +                                                /* load & send rows to kids */
665          for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
666 <            FILE        *ofp = outfp[cur_row % nchildren];
666 >            int         wfd = cproc[cur_row % (nchildren-1)].w;
667              for (i = 0; i < nmats; i++)
668                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
669                          if (cur_row > in_nrows) /* unknown #input rows? */
670                                  break;
671 <                        fprintf(stderr, "%s: read error at row %d\n",
671 >                        fprintf(stderr, "%s: parent read error at row %d\n",
672                                          mop[i].inspec, cur_row);
673                          return(0);
674                  }
675              if (i < nmats)
676                  break;
677              for (i = 0; i < nmats; i++)
678 <                if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp,
679 <                                        mop[i].imx.ncols, DTdouble, ofp))
680 <                        return(0);
672 <            if (fflush(ofp) == EOF)
673 <                return(0);
678 >                if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx))
679 >                                        != rmx_array_size(&mop[i].imx))
680 >                        return(0);
681          }
675        for (i = 0; i < nchildren; i++)
676                fclose(outfp[i]);
677        free(outfp);
682          i = close_processes(cproc, nchildren);
683 <        free(cproc); cproc = NULL;
683 >        free(cproc); cproc = NULL; nchildren = 0;
684          if (i < 0) {
685 <                fputs("Warning: missing child in parent_loop()\n", stderr);
685 >                fputs("Warning: lost child in parent_loop()\n", stderr);
686                  return(1);
687          }
688          if (i > 0) {
689                  fprintf(stderr, "Child exited with status %d\n", i);
690                  return(0);
691          }
692 <        return(1);
692 >        return(1);                              /* return success! */
693   memerror:
694          fputs("Out of memory in parent_loop()\n", stderr);
695          exit(1);
696   }
697  
698 < static int
699 < combine_input()
698 > int
699 > combine_input(void)
700   {
701          const int       row0 = (inchild >= 0)*inchild;
702 <        const int       rstep = nchildren + !nchildren;
702 >        const int       rstep = nchildren ? nchildren-1 : 1;
703          ROPMAT          *res = &mop[nmats];
704          int             set_r, set_c;
705          RMATRIX         *tmp = NULL;
706          int             co_set;
703        sigset_t        iomask;
707          int             i;
708  
709          if (mcat && mcat_last &&
# Line 720 | Line 723 | combine_input()
723                  set_c = varlookup("c") != NULL && !vardefined("c");
724          } else                          /* save a little time */
725                  set_r = set_c = 0;
726 <
724 <        sigemptyset(&iomask);           /* read/process row-by-row */
725 <        sigaddset(&iomask, SIGIO);
726 >                                        /* read/process row-by-row */
727          for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) {
728              RMATRIX     *mres = NULL;
728            if (inchild >= 0) sigprocmask(SIG_BLOCK, &iomask, NULL);
729              for (i = 0; i < nmats; i++)
730                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
731                          if (cur_row > in_nrows) /* unknown #input rows? */
# Line 734 | Line 734 | combine_input()
734                                          mop[i].inspec, cur_row);
735                          return(0);
736                  }
737            if (inchild >= 0) sigprocmask(SIG_UNBLOCK, &iomask, NULL);
737              if (i < nmats)
738                  break;
739              for (i = 0; i < nmats; i++)
# Line 776 | Line 775 | combine_input()
775                          return(0);
776              }
777              rmx_free(mres); mres = NULL;
779            if (inchild >= 0) {         /* children share stdout */
780                while (nr_out < cur_row)
781                    pause();            /* wait for our turn */
782                sigprocmask(SIG_BLOCK, &iomask, NULL);
783            }
778              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
779                                  res->rmp->ncols, res->rmp->dtype, stdout))
780                  return(0);
781 <            if (inchild >= 0) {         /* flush and notify group */
782 <                if (fflush(stdout) == EOF)
789 <                    return(0);
790 <                sigprocmask(SIG_UNBLOCK, &iomask, NULL);
791 <                killpg(pgid, SIGIO);    /* increments everyone's nr_out */
792 <            }
781 >            if (inchild >= 0 && fflush(stdout) == EOF)
782 >                return(0);
783          }
784          return(inchild >= 0 || fflush(stdout) != EOF);
785   memerror:
# Line 800 | Line 790 | multerror:
790          return(0);
791   }
792  
793 < static int
793 > int
794 > output_loop(void)
795 > {
796 >        const size_t    row_size = rmx_array_size(mop[nmats].rmp);
797 >        int             i = nmats;
798 >        int             cur_child = 0;
799 >
800 >        if (mop[nmats].rmp != &mop[nmats].imx)          /* output is split? */
801 >                rmx_reset(&mop[nmats].imx);
802 >        while (i-- > 0) {                               /* close input matrices */
803 >                fclose(mop[i].infp);            /* ! pclose() */
804 >                mop[i].infp = NULL;
805 >                rmx_reset(&mop[i].imx);
806 >                if (mop[i].rmp != &mop[i].imx) {
807 >                        rmx_free(mop[i].rmp);
808 >                        mop[i].rmp = &mop[i].imx;
809 >                }
810 >        }
811 > #ifdef getc_unlocked
812 >        flockfile(stdout);                              /* we own this, now */
813 > #endif
814 >        for ( ; ; ) {                                   /* loop until no more */
815 >                ssize_t         rv;
816 >                rv = readbuf(cproc[cur_child].r, mop[nmats].rmp->mtx, row_size);
817 >                if (!rv)                                /* out of rows? */
818 >                        break;
819 >                if (rv != row_size) {
820 >                        fputs("Read error in output loop\n", stderr);
821 >                        return(0);
822 >                }                                       /* do final conversion */
823 >                if (!rmx_write_data(mop[nmats].rmp->mtx, mop[nmats].rmp->ncomp,
824 >                                mop[nmats].rmp->ncols, mop[nmats].rmp->dtype, stdout)) {
825 >                        fputs("Conversion/write error in output loop\n", stderr);
826 >                        return(0);
827 >                }
828 >                cur_child++;
829 >                cur_child *= (cur_child < inchild);
830 >        }
831 >        return(fflush(stdout) != EOF);
832 > }
833 >
834 > int
835   get_factors(double da[], int n, char *av[])
836   {
837          int     ac;
# Line 810 | Line 841 | get_factors(double da[], int n, char *av[])
841          return(ac);
842   }
843  
844 < static void
844 > void
845   resize_inparr(int n2alloc)
846   {
847          int     i;
# Line 922 | Line 953 | main(int argc, char *argv[])
953                                  if (n && !isflt(argv[i+1])) {
954                                          mop[nmats].preop.csym = argv[++i];
955                                          mop[nmats].preop.clen = 0;
956 +                                        mcat_last = 0;
957                                          break;
958                                  }
959                                  if (n > MAXCOMP*MAXCOMP) n = MAXCOMP*MAXCOMP;
# Line 1012 | Line 1044 | main(int argc, char *argv[])
1044                  return(1);
1045          }
1046          doptimize(1);                   /* optimize definitions */
1047 <        if (spawned_children(nproc))    /* running in parent process? */
1047 >        i = spawned_children(nproc);    /* create multiple processes if requested */
1048 >        if (i > 0)                      /* running in parent process? */
1049                  return(parent_loop() ? 0 : 1);
1050 <                                        /* process & write rows */
1050 >        if (i < 0)                      /* running in output process? */
1051 >                return(output_loop() ? 0 : 1);
1052 >                                        /* else we are a worker process */
1053          return(combine_input() ? 0 : 1);
1054   stdin_error:
1055          fprintf(stderr, "%s: %s used for more than one input\n",

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines