ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.10 by greg, Mon May 20 23:21:29 2024 UTC vs.
Revision 2.18 by greg, Thu May 23 19:29:41 2024 UTC

# Line 3 | Line 3 | static const char RCSid[] = "$Id$";
3   #endif
4   /*
5   * General component matrix combiner, operating on a row at a time.
6 + *
7 + * Multi-processing mode under Unix creates children that each work
8 + * on one input row at a time, fed by the single parent.  Output to
9 + * a shared stdout is assured because each child waits for parent
10 + * to be ready with next input row before it outputs the previous result.
11 + * The assumption (checked in spawn_children()) is that the parent
12 + * will remain blocked until the child it is feeding has finished
13 + * flushing the previous row to stdout.  The final output row is
14 + * less guaranteed to be in order, so the parent sleeps for a few seconds
15 + * between each child pipe closure, in hopes that this allows enough
16 + * time for row output to finish in each process.
17   */
18  
19   #include <math.h>
# Line 543 | Line 554 | output_headinfo(FILE *fp)
554   static int
555   spawned_children(int np)
556   {
557 +        size_t  recsize = 0;
558          int     i, rv;
559  
560   #if defined(_WIN32) || defined(_WIN64)
# Line 551 | Line 563 | spawned_children(int np)
563                  np = 1;
564          } else
565   #endif
566 <        if ((in_nrows > 0) & (np > in_nrows))
567 <                np = in_nrows;
566 >        if ((in_nrows > 0) & (np*4 > in_nrows))
567 >                np = in_nrows/4;
568                                  /* we'll be doing a row at a time */
569          for (i = 0; i < nmats; i++) {
570                  mop[i].imx.nrows = 1;
571                  if (!rmx_prepare(&mop[i].imx))
572                          goto memerror;
573 +                recsize += rmx_array_size(&mop[i].imx);
574                  if (mop[i].rmp != &mop[i].imx) {
575                          mop[i].rmp->nrows = 1;
576                          if (!rmx_prepare(mop[i].rmp))
# Line 578 | Line 591 | spawned_children(int np)
591          mop[nmats].imx.nrows = 1;
592          if (!rmx_prepare(&mop[nmats].imx))
593                  goto memerror;
594 <        if (np <= 1) {          /* single process return point */
594 >        if (np <= 1) {          /* single process return */
595   #ifdef getc_unlocked
596                  for (i = 0; i < nmats; i++)
597                          flockfile(mop[i].infp);
# Line 598 | Line 611 | spawned_children(int np)
611                  cproc[i].pid = -1;
612                  rv = open_process(&cproc[i], NULL);
613                  if (rv <= 0) break;
614 +                if (!i && 2*rv >= recsize) {
615 +                        fputs("Problem too small for multi-processing\n",
616 +                                        stderr);
617 +                        close_processes(cproc, 1);
618 +                        exit(1);
619 +                }
620          }
602        if (rv > 0)
603                return(1);      /* parent return value */
621          if (rv < 0) {
622                  perror("fork");
623 +                close_processes(cproc, i);
624                  exit(1);
625          }
626 <        inchild = i;            /* child index */
626 >        if (rv > 0)             /* parent return? */
627 >                return(1);
628 >        inchild = i;            /* our child index */
629 >        while (i-- > 0)         /* don't share siblings' pipes */
630 >                close(cproc[i].w);
631          fpurge(stdin);          /* discard previous matrix input */
632   #ifdef getc_unlocked
633          flockfile(stdin);
634   #endif
635          for (i = 0; i < nmats; i++) {
636                  if (mop[i].infp != stdin)
637 <                        fclose(mop[i].infp);
637 >                        fclose(mop[i].infp);    /* ! pclose() */
638                  mop[i].infp = stdin;
639                  mop[i].imx.dtype = DTdouble;
640 +                mop[i].imx.pflags &= ~RMF_SWAPIN;
641          }
642          return(0);              /* child return */
643   memerror:
# Line 642 | Line 665 | parent_loop()
665   #endif
666          for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
667              FILE        *ofp = outfp[cur_row % nchildren];
668 <            for (i = 0; i < nmats; i++) {
668 >            for (i = 0; i < nmats; i++)
669                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
670                          if (cur_row > in_nrows) /* unknown #input rows? */
671                                  break;
# Line 650 | Line 673 | parent_loop()
673                                          mop[i].inspec, cur_row);
674                          return(0);
675                  }
653                if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp,
654                                mop[i].imx.ncols, DTdouble, ofp))
655                        return(0);
656            }
676              if (i < nmats)
677                  break;
678 +            for (i = 0; i < nmats; i++)
679 +                if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp,
680 +                                        mop[i].imx.ncols, DTdouble, ofp))
681 +                        return(0);
682              if (fflush(ofp) == EOF)
683                  return(0);
684          }
685 <        for (i = 0; i < nchildren; i++)
685 >        for (i = 0; i < nchildren; i++) {       /* maintain output order */
686 >                sleep(1+(mop[nmats].rmp->ncols*mop[nmats].rmp->ncomp >> 15));
687                  fclose(outfp[i]);
688 +        }
689          free(outfp);
690          i = close_processes(cproc, nchildren);
691          free(cproc); cproc = NULL;
# Line 706 | Line 731 | combine_input()
731                  set_c = varlookup("c") != NULL && !vardefined("c");
732          } else                          /* save a little time */
733                  set_r = set_c = 0;
734 +
735                                          /* read/process row-by-row */
736          for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) {
737              RMATRIX     *mres = NULL;
# Line 758 | Line 784 | combine_input()
784                          return(0);
785              }
786              rmx_free(mres); mres = NULL;
787 <            if (inchild >= 0) {
788 <                i = getc(stdin);        /* child waits for turn to output */
787 >            if (inchild >= 0) {         /* children share stdout */
788 >                i = getc(stdin);        /* signals it's our turn */
789                  if (i != EOF) ungetc(i, stdin);
790              }
791              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
# Line 899 | Line 925 | main(int argc, char *argv[])
925                                  if (n && !isflt(argv[i+1])) {
926                                          mop[nmats].preop.csym = argv[++i];
927                                          mop[nmats].preop.clen = 0;
928 +                                        mcat_last = 0;
929                                          break;
930                                  }
931                                  if (n > MAXCOMP*MAXCOMP) n = MAXCOMP*MAXCOMP;
# Line 989 | Line 1016 | main(int argc, char *argv[])
1016                  return(1);
1017          }
1018          doptimize(1);                   /* optimize definitions */
1019 <        if (spawned_children(nproc))    /* running in children? */
1019 >        if (spawned_children(nproc))    /* running in parent process? */
1020                  return(parent_loop() ? 0 : 1);
1021                                          /* process & write rows */
1022          return(combine_input() ? 0 : 1);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines