ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.15 by greg, Thu May 23 01:28:07 2024 UTC vs.
Revision 2.19 by greg, Mon Jun 3 18:55:51 2024 UTC

# Line 3 | Line 3 | static const char RCSid[] = "$Id$";
3   #endif
4   /*
5   * General component matrix combiner, operating on a row at a time.
6 + *
7 + * Multi-processing mode under Unix creates children that each work
8 + * on one input row at a time, fed by the original process.  Final conversion
9 + * and output to stdout is sorted by first child while its siblings send it
10 + * their record calculations.
11   */
12  
8 #include <signal.h>
13   #include <math.h>
14   #include "platform.h"
15   #include "rtprocess.h"
# Line 57 | Line 61 | int            cur_chan;                       /* if we're looping channels */
61   SUBPROC         *cproc = NULL;                  /* child process array */
62   int             nchildren = 0;                  /* # of child processes */
63   int             inchild = -1;                   /* our child ID (-1: parent) */
60 int             pgid = -1;                      /* process group ID */
61 int             nr_out = 0;                     /* # of rows output by kids */
64  
65   static int      checksymbolic(ROPMAT *rop);
66  
65 static void
66 on_sigio(int dummy)
67 {
68        nr_out++;                       /* happens when child outputs row */
69 }
70
67   static int
68   split_input(ROPMAT *rop)
69   {
# Line 549 | Line 545 | output_headinfo(FILE *fp)
545          }
546   }
547  
548 + static void
549 + reset_inputs(void)
550 + {
551 +        int     i = nmats;
552 +
553 +        fpurge(stdin);          /* discard previous matrix input */
554 +        while (i--) {
555 +                if (mop[i].infp != stdin)
556 +                        fclose(mop[i].infp);    /* ! pclose() */
557 +                mop[i].infp = stdin;
558 +                mop[i].imx.dtype = DTdouble;
559 +                mop[i].imx.pflags &= ~RMF_SWAPIN;
560 +        }
561 + }
562 +
563   static int
564   spawned_children(int np)
565   {
566 +        size_t  recsize = 0;
567          int     i, rv;
568  
569   #if defined(_WIN32) || defined(_WIN64)
# Line 560 | Line 572 | spawned_children(int np)
572                  np = 1;
573          } else
574   #endif
575 <        if ((in_nrows > 0) & (np > in_nrows))
576 <                np = in_nrows;
575 >        if ((in_nrows > 0) & (np*4 > in_nrows))
576 >                np = in_nrows/4;
577                                  /* we'll be doing a row at a time */
578          for (i = 0; i < nmats; i++) {
579                  mop[i].imx.nrows = 1;
580                  if (!rmx_prepare(&mop[i].imx))
581                          goto memerror;
582 +                recsize += rmx_array_size(&mop[i].imx);
583                  if (mop[i].rmp != &mop[i].imx) {
584                          mop[i].rmp->nrows = 1;
585                          if (!rmx_prepare(mop[i].rmp))
# Line 587 | Line 600 | spawned_children(int np)
600          mop[nmats].imx.nrows = 1;
601          if (!rmx_prepare(&mop[nmats].imx))
602                  goto memerror;
603 <        if (np <= 1) {          /* single process return point */
603 >        if (np <= 1) {          /* single process return */
604   #ifdef getc_unlocked
605                  for (i = 0; i < nmats; i++)
606                          flockfile(mop[i].infp);
# Line 595 | Line 608 | spawned_children(int np)
608   #endif
609                  return(0);
610          }
598        pgid = setpgrp();       /* set process group ID */
599        signal(SIGIO, on_sigio);
611          fflush(stdout);         /* flush header & spawn children */
612 <        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*np);
612 >        nchildren = np+1;
613 >        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
614          if (!cproc)
615                  goto memerror;
616 <        nchildren = np;
617 <        for (i = 0; i < np; i++) {
618 <                cproc[i].flags = PF_FILT_OUT;
619 <                cproc[i].w = dup(1);
616 >                                /* first child becomes stdout filter */
617 >        cproc[0].flags = PF_FILT_OUT;
618 >        cproc[0].w = fileno(stdout);
619 >        cproc[0].r = 0;
620 >        cproc[0].pid = -1;
621 >        rv = open_process(&cproc[i], NULL);
622 >        if (rv < 0) {
623 >                perror("fork");
624 >                exit(1);
625 >        }
626 >        if (rv == 0) {          /* loop if first child */
627 >                inchild = 0;
628 >                reset_inputs();
629 >                output_loop();
630 >                _exit(0);
631 >        }
632 >                                /* start other children */
633 >        for (i = 1; (i < nchildren) & (rv > 0); i++) {
634 >                cproc[i].flags = 0;
635 >                cproc[i].w = 1;
636                  cproc[i].r = 0;
637                  cproc[i].pid = -1;
638                  rv = open_process(&cproc[i], NULL);
611                if (rv <= 0) break;
639          }
613        if (rv > 0)
614                return(1);      /* parent return value */
640          if (rv < 0) {
641                  perror("fork");
642 +                close_processes(cproc, i);
643                  exit(1);
644          }
645 +        if (rv > 0)             /* parent return? */
646 +                return(1);
647          inchild = i;            /* our child index */
648 <        while (i-- > 0)         /* don't share siblings' pipes */
648 >        while (--i > 0)         /* don't share siblings' pipes */
649                  close(cproc[i].w);
650 <        fpurge(stdin);          /* discard previous matrix input */
650 >        reset_inputs();
651   #ifdef getc_unlocked
652          flockfile(stdin);
653   #endif
626        for (i = 0; i < nmats; i++) {
627                if (mop[i].infp != stdin)
628                        fclose(mop[i].infp);    /* ! pclose() */
629                mop[i].infp = stdin;
630                mop[i].imx.dtype = DTdouble;
631        }
654          return(0);              /* child return */
655   memerror:
656          fputs("Out of memory in spawned_children()\n", stderr);
# Line 636 | Line 658 | memerror:
658   }
659  
660   static int
661 < parent_loop()
661 > parent_loop(void)
662   {
641        FILE    **outfp = (FILE **)malloc(nchildren*sizeof(FILE *));
663          int     i;
664  
665 <        if (!outfp) goto memerror;
666 <        for (i = 0; i < nchildren; i++) {
667 <                outfp[i] = fdopen(cproc[i].w, "w");
647 <                if (!outfp[i]) goto memerror;
665 >        rmx_reset(&mop[nmats].imx);             /* not doing output side */
666 >        if (mop[nmats].rmp != &mop[nmats].imx)
667 >                rmx_reset(mop[nmats].rmp);
668   #ifdef getc_unlocked
649                flockfile(outfp[i]);
650 #endif
651        }
652 #ifdef getc_unlocked
669          for (i = 0; i < nmats; i++)
670                  flockfile(mop[i].infp);
671   #endif
672 +                                                /* load & send rows to kids */
673          for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
674 <            FILE        *ofp = outfp[cur_row % nchildren];
674 >            int wfd = cproc[1 + cur_row%(nchildren-1)].w;
675              for (i = 0; i < nmats; i++)
676                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
677                          if (cur_row > in_nrows) /* unknown #input rows? */
# Line 666 | Line 683 | parent_loop()
683              if (i < nmats)
684                  break;
685              for (i = 0; i < nmats; i++)
686 <                if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp,
687 <                                        mop[i].imx.ncols, DTdouble, ofp))
688 <                        return(0);
672 <            if (fflush(ofp) == EOF)
673 <                return(0);
686 >                if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx))
687 >                                        != rmx_array_size(&mop[i].imx))
688 >                        return(0);
689          }
675        for (i = 0; i < nchildren; i++)
676                fclose(outfp[i]);
677        free(outfp);
690          i = close_processes(cproc, nchildren);
691          free(cproc); cproc = NULL;
692          if (i < 0) {
# Line 692 | Line 704 | memerror:
704   }
705  
706   static int
707 < combine_input()
707 > combine_input(void)
708   {
709 <        const int       row0 = (inchild >= 0)*inchild;
709 >        const int       row0 = (inchild > 0)*(inchild-1);
710          const int       rstep = nchildren + !nchildren;
711          ROPMAT          *res = &mop[nmats];
712          int             set_r, set_c;
713          RMATRIX         *tmp = NULL;
714          int             co_set;
703        sigset_t        iomask;
715          int             i;
716  
717          if (mcat && mcat_last &&
# Line 720 | Line 731 | combine_input()
731                  set_c = varlookup("c") != NULL && !vardefined("c");
732          } else                          /* save a little time */
733                  set_r = set_c = 0;
734 <
724 <        sigemptyset(&iomask);           /* read/process row-by-row */
725 <        sigaddset(&iomask, SIGIO);
734 >                                        /* read/process row-by-row */
735          for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) {
736              RMATRIX     *mres = NULL;
728            if (inchild >= 0) sigprocmask(SIG_BLOCK, &iomask, NULL);
737              for (i = 0; i < nmats; i++)
738                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
739                          if (cur_row > in_nrows) /* unknown #input rows? */
# Line 734 | Line 742 | combine_input()
742                                          mop[i].inspec, cur_row);
743                          return(0);
744                  }
737            if (inchild >= 0) sigprocmask(SIG_UNBLOCK, &iomask, NULL);
745              if (i < nmats)
746                  break;
747              for (i = 0; i < nmats; i++)
# Line 776 | Line 783 | combine_input()
783                          return(0);
784              }
785              rmx_free(mres); mres = NULL;
786 <            if (inchild >= 0) {         /* children share stdout */
787 <                while (nr_out < cur_row)
788 <                    pause();            /* wait for our turn */
782 <                sigprocmask(SIG_BLOCK, &iomask, NULL);
786 >            if (inchild > 0) {          /* children share stdout */
787 >                i = getc(stdin);        /* signals it's our turn */
788 >                if (i != EOF) ungetc(i, stdin);
789              }
790              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
791                                  res->rmp->ncols, res->rmp->dtype, stdout))
792                  return(0);
793 <            if (inchild >= 0) {         /* flush and notify group */
794 <                if (fflush(stdout) == EOF)
789 <                    return(0);
790 <                sigprocmask(SIG_UNBLOCK, &iomask, NULL);
791 <                killpg(pgid, SIGIO);    /* increments everyone's nr_out */
792 <            }
793 >            if (inchild > 0 && fflush(stdout) == EOF)
794 >                return(0);
795          }
796 <        return(inchild >= 0 || fflush(stdout) != EOF);
796 >        return(inchild > 0 || fflush(stdout) != EOF);
797   memerror:
798          fputs("Out of buffer space in combine_input()\n", stderr);
799          return(0);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines