ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.19 by greg, Mon Jun 3 18:55:51 2024 UTC vs.
Revision 2.23 by greg, Fri Jun 7 03:55:59 2024 UTC

# Line 6 | Line 6 | static const char RCSid[] = "$Id$";
6   *
7   * Multi-processing mode under Unix creates children that each work
8   * on one input row at a time, fed by the original process.  Final conversion
9 < * and output to stdout is sorted by first child while its siblings send it
9 > * and output to stdout is sorted by last child while its siblings send it
10   * their record calculations.
11   */
12  
# Line 62 | Line 62 | SUBPROC                *cproc = NULL;                  /* child process array */
62   int             nchildren = 0;                  /* # of child processes */
63   int             inchild = -1;                   /* our child ID (-1: parent) */
64  
65 < static int      checksymbolic(ROPMAT *rop);
65 > extern int      checksymbolic(ROPMAT *rop);
66  
67 < static int
67 > int
68   split_input(ROPMAT *rop)
69   {
70          if (rop->rmp == &rop->imx && !(rop->rmp = rmx_copy(&rop->imx))) {
# Line 76 | Line 76 | split_input(ROPMAT *rop)
76   }
77  
78   /* Check/set transform based on a reference input file */
79 < static int
79 > int
80   checkreffile(ROPMAT *rop)
81   {
82          static const char       *curRF = NULL;
# Line 139 | Line 139 | checkreffile(ROPMAT *rop)
139   }
140  
141   /* Compute conversion row from spectrum to one channel of RGB */
142 < static void
142 > void
143   rgbrow(ROPMAT *rop, int r, int p)
144   {
145          const int       nc = rop->imx.ncomp;
# Line 156 | Line 156 | rgbrow(ROPMAT *rop, int r, int p)
156   }
157  
158   /* Compute conversion row from spectrum to one channel of XYZ */
159 < static void
159 > void
160   xyzrow(ROPMAT *rop, int r, int p)
161   {
162          const int       nc = rop->imx.ncomp;
# Line 173 | Line 173 | xyzrow(ROPMAT *rop, int r, int p)
173   }
174  
175   /* Use the spectral sensitivity function to compute matrix coefficients */
176 < static void
176 > void
177   sensrow(ROPMAT *rop, int r, double (*sf)(SCOLOR sc, int ncs, const float wlpt[4]))
178   {
179          const int       nc = rop->imx.ncomp;
# Line 188 | Line 188 | sensrow(ROPMAT *rop, int r, double (*sf)(SCOLOR sc, in
188   }
189  
190   /* Check/set symbolic transform */
191 < static int
191 > int
192   checksymbolic(ROPMAT *rop)
193   {
194          const int       nc = rop->imx.ncomp;
# Line 302 | Line 302 | checksymbolic(ROPMAT *rop)
302          return(1);
303   }
304  
305 < static int
305 > int
306   get_component_xfm(ROPMAT *rop)
307   {
308          int     i, j;
# Line 385 | Line 385 | get_component_xfm(ROPMAT *rop)
385          return(1);
386   }
387  
388 < static int
388 > int
389   apply_op(RMATRIX *dst, const RMATRIX *src, const RUNARYOP *ro)
390   {
391          if (ro->clen > 0) {
# Line 404 | Line 404 | apply_op(RMATRIX *dst, const RMATRIX *src, const RUNAR
404          return(1);
405   }
406  
407 < static int
407 > int
408   open_input(ROPMAT *rop)
409   {
410          int     outtype;
# Line 426 | Line 426 | open_input(ROPMAT *rop)
426   }
427  
428   /* Return nominal wavelength associated with input component (return nm) */
429 < static double
429 > double
430   l_wavelength(char *nam)
431   {
432          double  comp = argument(1);
# Line 450 | Line 450 | l_wavelength(char *nam)
450   }
451  
452   /* Return ith input with optional channel selector */
453 < static double
453 > double
454   l_chanin(char *nam)
455   {
456          double  inp = argument(1);
# Line 475 | Line 475 | l_chanin(char *nam)
475          return(mop[mi].rmp->mtx[cur_col*in_ncomp + chan]);
476   }
477  
478 < static int
478 > int
479   initialize(RMATRIX *imp)
480   {
481          int     i;
# Line 522 | Line 522 | initialize(RMATRIX *imp)
522          return(1);
523   }
524  
525 < static void
525 > void
526   output_headinfo(FILE *fp)
527   {
528          int     i;
# Line 545 | Line 545 | output_headinfo(FILE *fp)
545          }
546   }
547  
548 < static void
549 < reset_inputs(void)
550 < {
551 <        int     i = nmats;
552 <
553 <        fpurge(stdin);          /* discard previous matrix input */
554 <        while (i--) {
555 <                if (mop[i].infp != stdin)
556 <                        fclose(mop[i].infp);    /* ! pclose() */
557 <                mop[i].infp = stdin;
558 <                mop[i].imx.dtype = DTdouble;
559 <                mop[i].imx.pflags &= ~RMF_SWAPIN;
560 <        }
561 < }
562 <
563 < static int
548 > int
549   spawned_children(int np)
550   {
566        size_t  recsize = 0;
551          int     i, rv;
552  
553   #if defined(_WIN32) || defined(_WIN64)
# Line 579 | Line 563 | spawned_children(int np)
563                  mop[i].imx.nrows = 1;
564                  if (!rmx_prepare(&mop[i].imx))
565                          goto memerror;
582                recsize += rmx_array_size(&mop[i].imx);
566                  if (mop[i].rmp != &mop[i].imx) {
567                          mop[i].rmp->nrows = 1;
568                          if (!rmx_prepare(mop[i].rmp))
569                                  goto memerror;
570                  }
571          }
572 <                                /* prep output row buffer */
573 <        if (mcat || mop[nmats].preop.clen > 0) {
572 >                                /* prep output row buffer(s) */
573 >        if (mop[nmats].preop.clen > 0) {
574                  if (!split_input(&mop[nmats]))  /* need separate buffer */
592                        return(0);
593                if (mop[nmats].preop.clen > 0)
594                        mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
595                                                mop[nmats].imx.ncomp;
596                mop[nmats].rmp->nrows = 1;
597                if (!mcat | !mcat_last && !rmx_prepare(mop[nmats].rmp))
575                          goto memerror;
576 +                mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
577 +                                        mop[nmats].imx.ncomp;
578          }
579          mop[nmats].imx.nrows = 1;
580          if (!rmx_prepare(&mop[nmats].imx))
581                  goto memerror;
582 +        if (mop[nmats].rmp != &mop[nmats].imx) {
583 +                mop[nmats].rmp->nrows = 1;
584 +                if (!rmx_prepare(mop[nmats].rmp))
585 +                        goto memerror;
586 +        }
587          if (np <= 1) {          /* single process return */
588   #ifdef getc_unlocked
589                  for (i = 0; i < nmats; i++)
# Line 609 | Line 593 | spawned_children(int np)
593                  return(0);
594          }
595          fflush(stdout);         /* flush header & spawn children */
596 <        nchildren = np+1;
596 >        nchildren = np + 1;     /* extra child to sequence output */
597          cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
598          if (!cproc)
599                  goto memerror;
600 <                                /* first child becomes stdout filter */
601 <        cproc[0].flags = PF_FILT_OUT;
602 <        cproc[0].w = fileno(stdout);
603 <        cproc[0].r = 0;
604 <        cproc[0].pid = -1;
605 <        rv = open_process(&cproc[i], NULL);
600 >        for (i = nchildren; i--; ) cproc[i] = sp_inactive;
601 >        cproc[nchildren-1].flags |= PF_FILT_OUT;
602 >                                /* start each child from parent */
603 >        for (i = 0; i < nchildren; i++)
604 >                if ((rv = open_process(&cproc[i], NULL)) <= 0)
605 >                        break;  /* child breaks here */
606          if (rv < 0) {
607 <                perror("fork");
607 >                perror("fork"); /* WTH? */
608 >                close_processes(cproc, i);
609                  exit(1);
610          }
611 <        if (rv == 0) {          /* loop if first child */
612 <                inchild = 0;
613 <                reset_inputs();
614 <                output_loop();
615 <                _exit(0);
611 >        if (i != nchildren-1) { /* last child is sole reader */
612 >                int     j = i;
613 >                while (j-- > 0) {
614 >                        close(cproc[j].r);
615 >                        cproc[j].r = -1;
616 >                }
617          }
618 <                                /* start other children */
619 <        for (i = 1; (i < nchildren) & (rv > 0); i++) {
620 <                cproc[i].flags = 0;
621 <                cproc[i].w = 1;
622 <                cproc[i].r = 0;
623 <                cproc[i].pid = -1;
624 <                rv = open_process(&cproc[i], NULL);
618 >        if (rv > 0)
619 >                return(1);      /* parent return value */
620 >
621 >        inchild = i;            /* else set our child index */
622 >        while (i-- > 0)         /* only parent writes siblings */
623 >                close(cproc[i].w);
624 >
625 >        i = nmats;              /* close matrix streams (carefully) */
626 >        while (i-- > 0) {
627 >                if (mop[i].infp != stdin) {
628 >                        close(fileno(mop[i].infp));     /* avoid lseek() */
629 >                        fclose(mop[i].infp);            /* ! pclose() */
630 >                }
631 >                mop[i].infp = NULL;
632          }
633 <        if (rv < 0) {
634 <                perror("fork");
635 <                close_processes(cproc, i);
636 <                exit(1);
633 >        fpurge(stdin);          /* discard previously buffered input */
634 >
635 >        if (inchild == nchildren-1)
636 >                return(-1);     /* output process return value */
637 >
638 >        i = nmats;              /* get matrix rows from parent */
639 >        while (i-- > 0) {
640 >                mop[i].infp = stdin;
641 >                mop[i].imx.dtype = DTrmx_native;
642 >                mop[i].imx.pflags &= ~RMF_SWAPIN;
643          }
645        if (rv > 0)             /* parent return? */
646                return(1);
647        inchild = i;            /* our child index */
648        while (--i > 0)         /* don't share siblings' pipes */
649                close(cproc[i].w);
650        reset_inputs();
644   #ifdef getc_unlocked
645          flockfile(stdin);
646   #endif
647 <        return(0);              /* child return */
647 >        mop[nmats].rmp->dtype = DTrmx_native;
648 >        return(0);              /* worker child return value */
649   memerror:
650          fputs("Out of memory in spawned_children()\n", stderr);
651          exit(1);
652   }
653  
654 < static int
654 > int
655   parent_loop(void)
656   {
657          int     i;
658  
659 <        rmx_reset(&mop[nmats].imx);             /* not doing output side */
660 <        if (mop[nmats].rmp != &mop[nmats].imx)
661 <                rmx_reset(mop[nmats].rmp);
659 >        rmx_reset(&mop[nmats].imx);             /* not touching output side */
660 >        if (mop[nmats].rmp != &mop[nmats].imx) {
661 >                rmx_free(mop[nmats].rmp);
662 >                mop[nmats].rmp = &mop[nmats].imx;
663 >        }
664   #ifdef getc_unlocked
665 <        for (i = 0; i < nmats; i++)
665 >        for (i = 0; i < nmats; i++)             /* we handle matrix inputs */
666                  flockfile(mop[i].infp);
667   #endif
668                                                  /* load & send rows to kids */
669          for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
670 <            int wfd = cproc[1 + cur_row%(nchildren-1)].w;
670 >            int         wfd = cproc[cur_row % (nchildren-1)].w;
671              for (i = 0; i < nmats; i++)
672                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
673                          if (cur_row > in_nrows) /* unknown #input rows? */
674                                  break;
675 <                        fprintf(stderr, "%s: read error at row %d\n",
675 >                        fprintf(stderr, "%s: parent_loop() load error at row %d\n",
676                                          mop[i].inspec, cur_row);
677                          return(0);
678                  }
# Line 684 | Line 680 | parent_loop(void)
680                  break;
681              for (i = 0; i < nmats; i++)
682                  if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx))
683 <                                        != rmx_array_size(&mop[i].imx))
683 >                                        != rmx_array_size(&mop[i].imx)) {
684 >                        fprintf(stderr, "%s: parent_loop() write error at row %d\n",
685 >                                        mop[i].inspec, cur_row);
686                          return(0);
687 +                }
688          }
689 <        i = close_processes(cproc, nchildren);
690 <        free(cproc); cproc = NULL;
689 >        i = close_processes(cproc, nchildren);  /* collect family */
690 >        free(cproc); cproc = NULL; nchildren = 0;
691          if (i < 0) {
692 <                fputs("Warning: missing child in parent_loop()\n", stderr);
692 >                fputs("Warning: lost child in parent_loop()\n", stderr);
693                  return(1);
694          }
695          if (i > 0) {
696                  fprintf(stderr, "Child exited with status %d\n", i);
697                  return(0);
698          }
699 <        return(1);
701 < memerror:
702 <        fputs("Out of memory in parent_loop()\n", stderr);
703 <        exit(1);
699 >        return(1);                              /* return success! */
700   }
701  
702 < static int
702 > int
703   combine_input(void)
704   {
705 <        const int       row0 = (inchild > 0)*(inchild-1);
706 <        const int       rstep = nchildren + !nchildren;
705 >        const int       row0 = (inchild >= 0)*inchild;
706 >        const int       rstep = nchildren ? nchildren-1 : 1;
707          ROPMAT          *res = &mop[nmats];
708          int             set_r, set_c;
709          RMATRIX         *tmp = NULL;
710          int             co_set;
711          int             i;
712  
713 <        if (mcat && mcat_last &&
714 <                        !(tmp = rmx_alloc(1, res->imx.ncols, res->rmp->ncomp)))
715 <                goto memerror;
713 >        if (mcat_last && !(tmp = rmx_alloc(1, res->imx.ncols, res->rmp->ncomp))) {
714 >                fputs("Out of buffer space in combine_input()\n", stderr);
715 >                return(0);
716 >        }
717                                          /* figure out what the user set */
718          co_set = fundefined("co");
719          if (!co_set)
# Line 738 | Line 735 | combine_input(void)
735                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
736                          if (cur_row > in_nrows) /* unknown #input rows? */
737                                  break;
738 <                        fprintf(stderr, "%s: read error at row %d\n",
738 >                        fprintf(stderr, "%s: combine_input() load error at row %d\n",
739                                          mop[i].inspec, cur_row);
740                          return(0);
741                  }
# Line 783 | Line 780 | combine_input(void)
780                          return(0);
781              }
782              rmx_free(mres); mres = NULL;
786            if (inchild > 0) {          /* children share stdout */
787                i = getc(stdin);        /* signals it's our turn */
788                if (i != EOF) ungetc(i, stdin);
789            }
783              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
784 <                                res->rmp->ncols, res->rmp->dtype, stdout))
784 >                                res->rmp->ncols, res->rmp->dtype, stdout) ||
785 >                                 (inchild >= 0 && fflush(stdout) == EOF)) {
786 >                fprintf(stderr, "Conversion/write error at row %d in combine_input()\n",
787 >                                cur_row);
788                  return(0);
789 <            if (inchild > 0 && fflush(stdout) == EOF)
794 <                return(0);
789 >            }
790          }
791 <        return(inchild > 0 || fflush(stdout) != EOF);
797 < memerror:
798 <        fputs("Out of buffer space in combine_input()\n", stderr);
799 <        return(0);
791 >        return(inchild >= 0 || fflush(stdout) != EOF);
792   multerror:
793          fputs("Unexpected matrix multiply error in combine_input()\n", stderr);
794          return(0);
795   }
796  
797 < static int
797 > int
798 > output_loop(void)
799 > {
800 >        const size_t    row_size = rmx_array_size(mop[nmats].rmp);
801 >        int             cur_child = 0;
802 >        int             i = nmats;
803 >
804 >        while (i-- > 0) {                               /* free input buffers */
805 >                rmx_reset(&mop[i].imx);
806 >                if (mop[i].rmp != &mop[i].imx) {
807 >                        rmx_free(mop[i].rmp);
808 >                        mop[i].rmp = &mop[i].imx;
809 >                }
810 >        }
811 >        if (mop[nmats].rmp != &mop[nmats].imx)          /* output is split? */
812 >                rmx_reset(&mop[nmats].imx);
813 > #ifdef getc_unlocked
814 >        flockfile(stdout);                              /* we own this, now */
815 > #endif
816 >        for ( ; ; ) {                                   /* loop until no more */
817 >                ssize_t         rv;
818 >                rv = readbuf(cproc[cur_child].r, mop[nmats].rmp->mtx, row_size);
819 >                if (!rv)                                /* out of rows? */
820 >                        break;
821 >                if (rv != row_size) {
822 >                        fputs("Read error in output_loop()\n", stderr);
823 >                        return(0);
824 >                }                                       /* do final conversion */
825 >                if (!rmx_write_data(mop[nmats].rmp->mtx, mop[nmats].rmp->ncomp,
826 >                                mop[nmats].rmp->ncols, mop[nmats].rmp->dtype, stdout)) {
827 >                        fputs("Conversion/write error in output_loop()\n", stderr);
828 >                        return(0);
829 >                }
830 >                cur_child++;
831 >                cur_child *= (cur_child < inchild);     /* loop over workers */
832 >        }
833 >        return(fflush(stdout) != EOF);
834 > }
835 >
836 > int
837   get_factors(double da[], int n, char *av[])
838   {
839          int     ac;
# Line 812 | Line 843 | get_factors(double da[], int n, char *av[])
843          return(ac);
844   }
845  
846 < static void
846 > void
847   resize_inparr(int n2alloc)
848   {
849          int     i;
# Line 914 | Line 945 | main(int argc, char *argv[])
945                                  }
946                                  break;
947                          case 'C':
948 +                                mcat_last = 0;
949                                  if (!n || isflt(argv[i+1]))
950                                          goto userr;
951                                  defCsym = mop[nmats].preop.csym = argv[++i];
952                                  mop[nmats].preop.clen = 0;
921                                mcat_last = 0;
953                                  break;
954                          case 'c':
955 +                                mcat_last = 0;
956                                  if (n && !isflt(argv[i+1])) {
957                                          mop[nmats].preop.csym = argv[++i];
958                                          mop[nmats].preop.clen = 0;
927                                        mcat_last = 0;
959                                          break;
960                                  }
961                                  if (n > MAXCOMP*MAXCOMP) n = MAXCOMP*MAXCOMP;
# Line 937 | Line 968 | main(int argc, char *argv[])
968                                          goto userr;
969                                  }
970                                  mop[nmats].preop.csym = NULL;
940                                mcat_last = 0;
971                                  break;
972                          case 'm':
973 +                                mcat_last = 1;
974                                  if (!n) goto userr;
975                                  if (argv[++i][0] == '-' && !argv[i][1]) {
976                                          if (stdin_used++) goto stdin_error;
977                                          mcat_spec = stdin_name;
978                                  } else
979                                          mcat_spec = argv[i];
949                                mcat_last = 1;
980                                  break;
981                          default:
982                                  fprintf(stderr, "%s: unknown option '%s'\n",
# Line 1015 | Line 1045 | main(int argc, char *argv[])
1045                  return(1);
1046          }
1047          doptimize(1);                   /* optimize definitions */
1048 <        if (spawned_children(nproc))    /* running in parent process? */
1048 >        i = spawned_children(nproc);    /* create multiple processes if requested */
1049 >        if (i > 0)                      /* running in parent process? */
1050                  return(parent_loop() ? 0 : 1);
1051 <                                        /* process & write rows */
1051 >        if (i < 0)                      /* running in output process? */
1052 >                return(output_loop() ? 0 : 1);
1053 >                                        /* else we are a worker process */
1054          return(combine_input() ? 0 : 1);
1055   stdin_error:
1056          fprintf(stderr, "%s: %s used for more than one input\n",

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines