ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.18 by greg, Thu May 23 19:29:41 2024 UTC vs.
Revision 2.26 by greg, Fri Nov 8 17:52:26 2024 UTC

# Line 5 | Line 5 | static const char RCSid[] = "$Id$";
5   * General component matrix combiner, operating on a row at a time.
6   *
7   * Multi-processing mode under Unix creates children that each work
8 < * on one input row at a time, fed by the single parent.  Output to
9 < * a shared stdout is assured because each child waits for parent
10 < * to be ready with next input row before it outputs the previous result.
11 < * The assumption (checked in spawn_children()) is that the parent
12 < * will remain blocked until the child it is feeding has finished
13 < * flushing the previous row to stdout.  The final output row is
14 < * less guaranteed to be in order, so the parent sleeps for a few seconds
15 < * between each child pipe closure, in hopes that this allows enough
16 < * time for row output to finish in each process.
8 > * on one input row at a time, fed by the original process.  Final conversion
9 > * and output to stdout is sorted by last child while its siblings send it
10 > * their record calculations.
11   */
12  
13   #include <math.h>
# Line 68 | Line 62 | SUBPROC                *cproc = NULL;                  /* child process array */
62   int             nchildren = 0;                  /* # of child processes */
63   int             inchild = -1;                   /* our child ID (-1: parent) */
64  
65 < static int      checksymbolic(ROPMAT *rop);
65 > extern int      checksymbolic(ROPMAT *rop);
66  
67 < static int
67 > int
68   split_input(ROPMAT *rop)
69   {
70          if (rop->rmp == &rop->imx && !(rop->rmp = rmx_copy(&rop->imx))) {
# Line 82 | Line 76 | split_input(ROPMAT *rop)
76   }
77  
78   /* Check/set transform based on a reference input file */
79 < static int
79 > int
80   checkreffile(ROPMAT *rop)
81   {
82          static const char       *curRF = NULL;
# Line 145 | Line 139 | checkreffile(ROPMAT *rop)
139   }
140  
141   /* Compute conversion row from spectrum to one channel of RGB */
142 < static void
142 > void
143   rgbrow(ROPMAT *rop, int r, int p)
144   {
145          const int       nc = rop->imx.ncomp;
# Line 162 | Line 156 | rgbrow(ROPMAT *rop, int r, int p)
156   }
157  
158   /* Compute conversion row from spectrum to one channel of XYZ */
159 < static void
159 > void
160   xyzrow(ROPMAT *rop, int r, int p)
161   {
162          const int       nc = rop->imx.ncomp;
# Line 179 | Line 173 | xyzrow(ROPMAT *rop, int r, int p)
173   }
174  
175   /* Use the spectral sensitivity function to compute matrix coefficients */
176 < static void
177 < sensrow(ROPMAT *rop, int r, double (*sf)(SCOLOR sc, int ncs, const float wlpt[4]))
176 > void
177 > sensrow(ROPMAT *rop, int r, double (*sf)(const SCOLOR sc, int ncs, const float wlpt[4]))
178   {
179          const int       nc = rop->imx.ncomp;
180          int             i;
# Line 194 | Line 188 | sensrow(ROPMAT *rop, int r, double (*sf)(SCOLOR sc, in
188   }
189  
190   /* Check/set symbolic transform */
191 < static int
191 > int
192   checksymbolic(ROPMAT *rop)
193   {
194          const int       nc = rop->imx.ncomp;
# Line 308 | Line 302 | checksymbolic(ROPMAT *rop)
302          return(1);
303   }
304  
305 < static int
305 > int
306   get_component_xfm(ROPMAT *rop)
307   {
308          int     i, j;
# Line 391 | Line 385 | get_component_xfm(ROPMAT *rop)
385          return(1);
386   }
387  
388 < static int
388 > int
389   apply_op(RMATRIX *dst, const RMATRIX *src, const RUNARYOP *ro)
390   {
391          if (ro->clen > 0) {
# Line 410 | Line 404 | apply_op(RMATRIX *dst, const RMATRIX *src, const RUNAR
404          return(1);
405   }
406  
407 < static int
407 > int
408   open_input(ROPMAT *rop)
409   {
410          int     outtype;
# Line 432 | Line 426 | open_input(ROPMAT *rop)
426   }
427  
428   /* Return nominal wavelength associated with input component (return nm) */
429 < static double
429 > double
430   l_wavelength(char *nam)
431   {
432          double  comp = argument(1);
# Line 456 | Line 450 | l_wavelength(char *nam)
450   }
451  
452   /* Return ith input with optional channel selector */
453 < static double
453 > double
454   l_chanin(char *nam)
455   {
456          double  inp = argument(1);
# Line 481 | Line 475 | l_chanin(char *nam)
475          return(mop[mi].rmp->mtx[cur_col*in_ncomp + chan]);
476   }
477  
478 < static int
478 > int
479   initialize(RMATRIX *imp)
480   {
481          int     i;
# Line 494 | Line 488 | initialize(RMATRIX *imp)
488                  restype = mop[i].rmp->dtype;
489                  if (!imp->dtype || (restype = rmx_newtype(restype, imp->dtype)) > 0)
490                          imp->dtype = restype;
491 <                else
491 >                else if (!nowarn)
492                          fprintf(stderr, "%s: warning - data type mismatch\n",
493                                          mop[i].inspec);
494                  if (!i) {
# Line 528 | Line 522 | initialize(RMATRIX *imp)
522          return(1);
523   }
524  
525 < static void
525 > void
526   output_headinfo(FILE *fp)
527   {
528          int     i;
# Line 551 | Line 545 | output_headinfo(FILE *fp)
545          }
546   }
547  
548 < static int
548 > int
549   spawned_children(int np)
550   {
557        size_t  recsize = 0;
551          int     i, rv;
552  
553   #if defined(_WIN32) || defined(_WIN64)
554          if (np > 1) {
555 <                fputs("Warning: only one process under Windows\n", stderr);
555 >                if (!nowarn)
556 >                        fputs("Warning: only one process under Windows\n", stderr);
557                  np = 1;
558          } else
559   #endif
# Line 570 | Line 564 | spawned_children(int np)
564                  mop[i].imx.nrows = 1;
565                  if (!rmx_prepare(&mop[i].imx))
566                          goto memerror;
573                recsize += rmx_array_size(&mop[i].imx);
567                  if (mop[i].rmp != &mop[i].imx) {
568                          mop[i].rmp->nrows = 1;
569                          if (!rmx_prepare(mop[i].rmp))
570                                  goto memerror;
571                  }
572          }
573 <                                /* prep output row buffer */
574 <        if (mcat || mop[nmats].preop.clen > 0) {
573 >                                /* prep output row buffer(s) */
574 >        if (mop[nmats].preop.clen > 0) {
575                  if (!split_input(&mop[nmats]))  /* need separate buffer */
583                        return(0);
584                if (mop[nmats].preop.clen > 0)
585                        mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
586                                                mop[nmats].imx.ncomp;
587                mop[nmats].rmp->nrows = 1;
588                if (!mcat | !mcat_last && !rmx_prepare(mop[nmats].rmp))
576                          goto memerror;
577 +                mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
578 +                                        mop[nmats].imx.ncomp;
579          }
580          mop[nmats].imx.nrows = 1;
581          if (!rmx_prepare(&mop[nmats].imx))
582                  goto memerror;
583 +        if (mop[nmats].rmp != &mop[nmats].imx) {
584 +                mop[nmats].rmp->nrows = 1;
585 +                if (!rmx_prepare(mop[nmats].rmp))
586 +                        goto memerror;
587 +        }
588          if (np <= 1) {          /* single process return */
589   #ifdef getc_unlocked
590                  for (i = 0; i < nmats; i++)
# Line 600 | Line 594 | spawned_children(int np)
594                  return(0);
595          }
596          fflush(stdout);         /* flush header & spawn children */
597 <        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*np);
597 >        nchildren = np + 1;     /* extra child to sequence output */
598 >        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
599          if (!cproc)
600                  goto memerror;
601 <        nchildren = np;
602 <        for (i = 0; i < np; i++) {
603 <                cproc[i].flags = PF_FILT_OUT;
604 <                cproc[i].w = dup(1);
605 <                cproc[i].r = 0;
606 <                cproc[i].pid = -1;
612 <                rv = open_process(&cproc[i], NULL);
613 <                if (rv <= 0) break;
614 <                if (!i && 2*rv >= recsize) {
615 <                        fputs("Problem too small for multi-processing\n",
616 <                                        stderr);
617 <                        close_processes(cproc, 1);
618 <                        exit(1);
619 <                }
620 <        }
601 >        for (i = nchildren; i--; ) cproc[i] = sp_inactive;
602 >        cproc[nchildren-1].flags |= PF_FILT_OUT;
603 >                                /* start each child from parent */
604 >        for (i = 0; i < nchildren; i++)
605 >                if ((rv = open_process(&cproc[i], NULL)) <= 0)
606 >                        break;  /* child breaks here */
607          if (rv < 0) {
608 <                perror("fork");
608 >                perror("fork"); /* WTH? */
609                  close_processes(cproc, i);
610                  exit(1);
611          }
612 <        if (rv > 0)             /* parent return? */
613 <                return(1);
614 <        inchild = i;            /* our child index */
615 <        while (i-- > 0)         /* don't share siblings' pipes */
612 >        if (i != nchildren-1) { /* last child is sole reader */
613 >                int     j = i;
614 >                while (j-- > 0) {
615 >                        close(cproc[j].r);
616 >                        cproc[j].r = -1;
617 >                }
618 >        }
619 >        if (rv > 0)
620 >                return(1);      /* parent return value */
621 >
622 >        inchild = i;            /* else set our child index */
623 >        while (i-- > 0)         /* only parent writes siblings */
624                  close(cproc[i].w);
625 <        fpurge(stdin);          /* discard previous matrix input */
626 < #ifdef getc_unlocked
627 <        flockfile(stdin);
628 < #endif
629 <        for (i = 0; i < nmats; i++) {
630 <                if (mop[i].infp != stdin)
631 <                        fclose(mop[i].infp);    /* ! pclose() */
625 >
626 >        i = nmats;              /* close matrix streams (carefully) */
627 >        while (i-- > 0) {
628 >                if (mop[i].infp != stdin) {
629 >                        close(fileno(mop[i].infp));     /* avoid lseek() */
630 >                        fclose(mop[i].infp);            /* ! pclose() */
631 >                }
632 >                mop[i].infp = NULL;
633 >        }
634 >        fpurge(stdin);          /* discard previously buffered input */
635 >
636 >        if (inchild == nchildren-1)
637 >                return(-1);     /* output process return value */
638 >
639 >        i = nmats;              /* get matrix rows from parent */
640 >        while (i-- > 0) {
641                  mop[i].infp = stdin;
642 <                mop[i].imx.dtype = DTdouble;
642 >                mop[i].imx.dtype = DTrmx_native;
643                  mop[i].imx.pflags &= ~RMF_SWAPIN;
644          }
645 <        return(0);              /* child return */
645 > #ifdef getc_unlocked
646 >        flockfile(stdin);
647 > #endif
648 >        mop[nmats].rmp->dtype = DTrmx_native;
649 >        return(0);              /* worker child return value */
650   memerror:
651          fputs("Out of memory in spawned_children()\n", stderr);
652          exit(1);
653   }
654  
655 < static int
656 < parent_loop()
655 > int
656 > parent_loop(void)
657   {
651        FILE    **outfp = (FILE **)malloc(nchildren*sizeof(FILE *));
658          int     i;
659  
660 <        if (!outfp) goto memerror;
661 <        for (i = 0; i < nchildren; i++) {
662 <                outfp[i] = fdopen(cproc[i].w, "w");
663 <                if (!outfp[i]) goto memerror;
658 < #ifdef getc_unlocked
659 <                flockfile(outfp[i]);
660 < #endif
660 >        rmx_reset(&mop[nmats].imx);             /* not touching output side */
661 >        if (mop[nmats].rmp != &mop[nmats].imx) {
662 >                rmx_free(mop[nmats].rmp);
663 >                mop[nmats].rmp = &mop[nmats].imx;
664          }
665   #ifdef getc_unlocked
666 <        for (i = 0; i < nmats; i++)
666 >        for (i = 0; i < nmats; i++)             /* we handle matrix inputs */
667                  flockfile(mop[i].infp);
668   #endif
669 +                                                /* load & send rows to kids */
670          for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
671 <            FILE        *ofp = outfp[cur_row % nchildren];
671 >            int         wfd = cproc[cur_row % (nchildren-1)].w;
672              for (i = 0; i < nmats; i++)
673                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
674                          if (cur_row > in_nrows) /* unknown #input rows? */
675                                  break;
676 <                        fprintf(stderr, "%s: read error at row %d\n",
676 >                        fprintf(stderr, "%s: parent_loop() load error at row %d\n",
677                                          mop[i].inspec, cur_row);
678                          return(0);
679                  }
680              if (i < nmats)
681                  break;
682              for (i = 0; i < nmats; i++)
683 <                if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp,
684 <                                        mop[i].imx.ncols, DTdouble, ofp))
685 <                        return(0);
686 <            if (fflush(ofp) == EOF)
687 <                return(0);
683 >                if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx))
684 >                                        != rmx_array_size(&mop[i].imx)) {
685 >                        fprintf(stderr, "%s: parent_loop() write error at row %d\n",
686 >                                        mop[i].inspec, cur_row);
687 >                        return(0);
688 >                }
689          }
690 <        for (i = 0; i < nchildren; i++) {       /* maintain output order */
691 <                sleep(1+(mop[nmats].rmp->ncols*mop[nmats].rmp->ncomp >> 15));
687 <                fclose(outfp[i]);
688 <        }
689 <        free(outfp);
690 <        i = close_processes(cproc, nchildren);
691 <        free(cproc); cproc = NULL;
690 >        i = close_processes(cproc, nchildren);  /* collect family */
691 >        free(cproc); cproc = NULL; nchildren = 0;
692          if (i < 0) {
693 <                fputs("Warning: missing child in parent_loop()\n", stderr);
693 >                if (!nowarn)
694 >                        fputs("Warning: lost child in parent_loop()\n", stderr);
695                  return(1);
696          }
697          if (i > 0) {
698                  fprintf(stderr, "Child exited with status %d\n", i);
699                  return(0);
700          }
701 <        return(1);
701 < memerror:
702 <        fputs("Out of memory in parent_loop()\n", stderr);
703 <        exit(1);
701 >        return(1);                              /* return success! */
702   }
703  
704 < static int
705 < combine_input()
704 > int
705 > combine_input(void)
706   {
707          const int       row0 = (inchild >= 0)*inchild;
708 <        const int       rstep = nchildren + !nchildren;
708 >        const int       rstep = nchildren ? nchildren-1 : 1;
709          ROPMAT          *res = &mop[nmats];
710          int             set_r, set_c;
711          RMATRIX         *tmp = NULL;
712          int             co_set;
713          int             i;
714  
715 <        if (mcat && mcat_last &&
716 <                        !(tmp = rmx_alloc(1, res->imx.ncols, res->rmp->ncomp)))
717 <                goto memerror;
715 >        if (mcat_last && !(tmp = rmx_alloc(1, res->imx.ncols, res->rmp->ncomp))) {
716 >                fputs("Out of buffer space in combine_input()\n", stderr);
717 >                return(0);
718 >        }
719                                          /* figure out what the user set */
720          co_set = fundefined("co");
721          if (!co_set)
# Line 731 | Line 730 | combine_input()
730                  set_c = varlookup("c") != NULL && !vardefined("c");
731          } else                          /* save a little time */
732                  set_r = set_c = 0;
734
733                                          /* read/process row-by-row */
734          for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) {
735              RMATRIX     *mres = NULL;
# Line 739 | Line 737 | combine_input()
737                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
738                          if (cur_row > in_nrows) /* unknown #input rows? */
739                                  break;
740 <                        fprintf(stderr, "%s: read error at row %d\n",
740 >                        fprintf(stderr, "%s: combine_input() load error at row %d\n",
741                                          mop[i].inspec, cur_row);
742                          return(0);
743                  }
# Line 784 | Line 782 | combine_input()
782                          return(0);
783              }
784              rmx_free(mres); mres = NULL;
787            if (inchild >= 0) {         /* children share stdout */
788                i = getc(stdin);        /* signals it's our turn */
789                if (i != EOF) ungetc(i, stdin);
790            }
785              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
786 <                                res->rmp->ncols, res->rmp->dtype, stdout))
786 >                                res->rmp->ncols, res->rmp->dtype, stdout) ||
787 >                                 (inchild >= 0 && fflush(stdout) == EOF)) {
788 >                fprintf(stderr, "Conversion/write error at row %d in combine_input()\n",
789 >                                cur_row);
790                  return(0);
791 <            if (inchild >= 0 && fflush(stdout) == EOF)
795 <                return(0);
791 >            }
792          }
793          return(inchild >= 0 || fflush(stdout) != EOF);
798 memerror:
799        fputs("Out of buffer space in combine_input()\n", stderr);
800        return(0);
794   multerror:
795          fputs("Unexpected matrix multiply error in combine_input()\n", stderr);
796          return(0);
797   }
798  
799 < static int
799 > int
800 > output_loop(void)
801 > {
802 >        const size_t    row_size = rmx_array_size(mop[nmats].rmp);
803 >        int             cur_child = 0;
804 >        int             i = nmats;
805 >
806 >        while (i-- > 0) {                               /* free input buffers */
807 >                rmx_reset(&mop[i].imx);
808 >                if (mop[i].rmp != &mop[i].imx) {
809 >                        rmx_free(mop[i].rmp);
810 >                        mop[i].rmp = &mop[i].imx;
811 >                }
812 >        }
813 >        if (mop[nmats].rmp != &mop[nmats].imx)          /* output is split? */
814 >                rmx_reset(&mop[nmats].imx);
815 > #ifdef getc_unlocked
816 >        flockfile(stdout);                              /* we own this, now */
817 > #endif
818 >        for ( ; ; ) {                                   /* loop until no more */
819 >                ssize_t         rv;
820 >                rv = readbuf(cproc[cur_child].r, mop[nmats].rmp->mtx, row_size);
821 >                if (!rv)                                /* out of rows? */
822 >                        break;
823 >                if (rv != row_size) {
824 >                        fputs("Read error in output_loop()\n", stderr);
825 >                        return(0);
826 >                }                                       /* do final conversion */
827 >                if (!rmx_write_data(mop[nmats].rmp->mtx, mop[nmats].rmp->ncomp,
828 >                                mop[nmats].rmp->ncols, mop[nmats].rmp->dtype, stdout)) {
829 >                        fputs("Conversion/write error in output_loop()\n", stderr);
830 >                        return(0);
831 >                }
832 >                cur_child++;
833 >                cur_child *= (cur_child < inchild);     /* loop over workers */
834 >        }
835 >        return(fflush(stdout) != EOF);
836 > }
837 >
838 > int
839   get_factors(double da[], int n, char *av[])
840   {
841          int     ac;
# Line 813 | Line 845 | get_factors(double da[], int n, char *av[])
845          return(ac);
846   }
847  
848 < static void
848 > void
849   resize_inparr(int n2alloc)
850   {
851          int     i;
852  
853          if (n2alloc == nall)
854                  return;
855 <        for (i = nall; i > n2alloc; i--) {
855 >        for (i = nall; i-- > n2alloc; ) {
856                  rmx_reset(&mop[i].imx);
857                  if (mop[i].rmp != &mop[i].imx)
858                          rmx_free(mop[i].rmp);
# Line 915 | Line 947 | main(int argc, char *argv[])
947                                  }
948                                  break;
949                          case 'C':
950 +                                mcat_last = 0;
951                                  if (!n || isflt(argv[i+1]))
952                                          goto userr;
953                                  defCsym = mop[nmats].preop.csym = argv[++i];
954                                  mop[nmats].preop.clen = 0;
922                                mcat_last = 0;
955                                  break;
956                          case 'c':
957 +                                mcat_last = 0;
958                                  if (n && !isflt(argv[i+1])) {
959                                          mop[nmats].preop.csym = argv[++i];
960                                          mop[nmats].preop.clen = 0;
928                                        mcat_last = 0;
961                                          break;
962                                  }
963                                  if (n > MAXCOMP*MAXCOMP) n = MAXCOMP*MAXCOMP;
# Line 938 | Line 970 | main(int argc, char *argv[])
970                                          goto userr;
971                                  }
972                                  mop[nmats].preop.csym = NULL;
941                                mcat_last = 0;
973                                  break;
974                          case 'm':
975 +                                mcat_last = 1;
976                                  if (!n) goto userr;
977                                  if (argv[++i][0] == '-' && !argv[i][1]) {
978                                          if (stdin_used++) goto stdin_error;
979                                          mcat_spec = stdin_name;
980                                  } else
981                                          mcat_spec = argv[i];
950                                mcat_last = 1;
982                                  break;
983                          default:
984                                  fprintf(stderr, "%s: unknown option '%s'\n",
# Line 1016 | Line 1047 | main(int argc, char *argv[])
1047                  return(1);
1048          }
1049          doptimize(1);                   /* optimize definitions */
1050 <        if (spawned_children(nproc))    /* running in parent process? */
1050 >        i = spawned_children(nproc);    /* create multiple processes if requested */
1051 >        if (i > 0)                      /* running in parent process? */
1052                  return(parent_loop() ? 0 : 1);
1053 <                                        /* process & write rows */
1053 >        if (i < 0)                      /* running in output process? */
1054 >                return(output_loop() ? 0 : 1);
1055 >                                        /* else we are a worker process */
1056          return(combine_input() ? 0 : 1);
1057   stdin_error:
1058          fprintf(stderr, "%s: %s used for more than one input\n",

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines