ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.8 by greg, Thu May 16 18:59:19 2024 UTC vs.
Revision 2.19 by greg, Mon Jun 3 18:55:51 2024 UTC

# Line 3 | Line 3 | static const char RCSid[] = "$Id$";
3   #endif
4   /*
5   * General component matrix combiner, operating on a row at a time.
6 + *
7 + * Multi-processing mode under Unix creates children that each work
8 + * on one input row at a time, fed by the original process.  Final conversion
9 + * and output to stdout is sorted by first child while its siblings send it
10 + * their record calculations.
11   */
12  
8 #include <errno.h>
13   #include <math.h>
14   #include "platform.h"
15 + #include "rtprocess.h"
16   #include "rtio.h"
17   #include "rmatrix.h"
18   #include "calcomp.h"
14 #include "paths.h"
19  
20   #ifndef M_PI
21   #define M_PI    3.14159265358979323846
# Line 54 | Line 58 | int            cur_row;                        /* current input/output row */
58   int             cur_col;                        /* current input/output column */
59   int             cur_chan;                       /* if we're looping channels */
60  
61 + SUBPROC         *cproc = NULL;                  /* child process array */
62 + int             nchildren = 0;                  /* # of child processes */
63 + int             inchild = -1;                   /* our child ID (-1: parent) */
64 +
65   static int      checksymbolic(ROPMAT *rop);
66  
67   static int
# Line 390 | Line 398 | apply_op(RMATRIX *dst, const RMATRIX *src, const RUNAR
398                          return(0);
399                  rmx_free(res);
400          } else if (dst != src)
401 <                memcpy(dst->mtx, src->mtx,
394 <                                sizeof(double)*dst->ncomp*dst->ncols*dst->nrows);
401 >                memcpy(dst->mtx, src->mtx, rmx_array_size(dst));
402          if (ro->nsf == dst->ncomp)
403                  rmx_scale(dst, ro->sca);
404          return(1);
# Line 538 | Line 545 | output_headinfo(FILE *fp)
545          }
546   }
547  
548 + static void
549 + reset_inputs(void)
550 + {
551 +        int     i = nmats;
552 +
553 +        fpurge(stdin);          /* discard previous matrix input */
554 +        while (i--) {
555 +                if (mop[i].infp != stdin)
556 +                        fclose(mop[i].infp);    /* ! pclose() */
557 +                mop[i].infp = stdin;
558 +                mop[i].imx.dtype = DTdouble;
559 +                mop[i].imx.pflags &= ~RMF_SWAPIN;
560 +        }
561 + }
562 +
563   static int
564 < combine_input(ROPMAT *res, FILE *fout)
564 > spawned_children(int np)
565   {
566 <        int     set_r, set_c;
567 <        RMATRIX *tmp = NULL;
568 <        int     co_set;
569 <        int     i;
570 <                                        /* allocate input row buffers */
566 >        size_t  recsize = 0;
567 >        int     i, rv;
568 >
569 > #if defined(_WIN32) || defined(_WIN64)
570 >        if (np > 1) {
571 >                fputs("Warning: only one process under Windows\n", stderr);
572 >                np = 1;
573 >        } else
574 > #endif
575 >        if ((in_nrows > 0) & (np*4 > in_nrows))
576 >                np = in_nrows/4;
577 >                                /* we'll be doing a row at a time */
578          for (i = 0; i < nmats; i++) {
579 <                mop[i].imx.nrows = 1;   /* we'll be doing a row at a time */
579 >                mop[i].imx.nrows = 1;
580                  if (!rmx_prepare(&mop[i].imx))
581                          goto memerror;
582 +                recsize += rmx_array_size(&mop[i].imx);
583                  if (mop[i].rmp != &mop[i].imx) {
584                          mop[i].rmp->nrows = 1;
585                          if (!rmx_prepare(mop[i].rmp))
586                                  goto memerror;
587                  }
588          }
589 <                                        /* prep output row buffers */
590 <        if (mcat || res->preop.clen > 0) {
591 <                if (!split_input(res))  /* need separate buffer */
589 >                                /* prep output row buffer */
590 >        if (mcat || mop[nmats].preop.clen > 0) {
591 >                if (!split_input(&mop[nmats]))  /* need separate buffer */
592                          return(0);
593 <                if (res->preop.clen > 0)
594 <                        res->rmp->ncomp = res->preop.clen / res->imx.ncomp;
595 <                res->rmp->nrows = 1;
596 <                if (!mcat | !mcat_last && !rmx_prepare(res->rmp))
593 >                if (mop[nmats].preop.clen > 0)
594 >                        mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
595 >                                                mop[nmats].imx.ncomp;
596 >                mop[nmats].rmp->nrows = 1;
597 >                if (!mcat | !mcat_last && !rmx_prepare(mop[nmats].rmp))
598                          goto memerror;
599          }
600 +        mop[nmats].imx.nrows = 1;
601 +        if (!rmx_prepare(&mop[nmats].imx))
602 +                goto memerror;
603 +        if (np <= 1) {          /* single process return */
604 + #ifdef getc_unlocked
605 +                for (i = 0; i < nmats; i++)
606 +                        flockfile(mop[i].infp);
607 +                flockfile(stdout);
608 + #endif
609 +                return(0);
610 +        }
611 +        fflush(stdout);         /* flush header & spawn children */
612 +        nchildren = np+1;
613 +        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
614 +        if (!cproc)
615 +                goto memerror;
616 +                                /* first child becomes stdout filter */
617 +        cproc[0].flags = PF_FILT_OUT;
618 +        cproc[0].w = fileno(stdout);
619 +        cproc[0].r = 0;
620 +        cproc[0].pid = -1;
621 +        rv = open_process(&cproc[i], NULL);
622 +        if (rv < 0) {
623 +                perror("fork");
624 +                exit(1);
625 +        }
626 +        if (rv == 0) {          /* loop if first child */
627 +                inchild = 0;
628 +                reset_inputs();
629 +                output_loop();
630 +                _exit(0);
631 +        }
632 +                                /* start other children */
633 +        for (i = 1; (i < nchildren) & (rv > 0); i++) {
634 +                cproc[i].flags = 0;
635 +                cproc[i].w = 1;
636 +                cproc[i].r = 0;
637 +                cproc[i].pid = -1;
638 +                rv = open_process(&cproc[i], NULL);
639 +        }
640 +        if (rv < 0) {
641 +                perror("fork");
642 +                close_processes(cproc, i);
643 +                exit(1);
644 +        }
645 +        if (rv > 0)             /* parent return? */
646 +                return(1);
647 +        inchild = i;            /* our child index */
648 +        while (--i > 0)         /* don't share siblings' pipes */
649 +                close(cproc[i].w);
650 +        reset_inputs();
651 + #ifdef getc_unlocked
652 +        flockfile(stdin);
653 + #endif
654 +        return(0);              /* child return */
655 + memerror:
656 +        fputs("Out of memory in spawned_children()\n", stderr);
657 +        exit(1);
658 + }
659 +
660 + static int
661 + parent_loop(void)
662 + {
663 +        int     i;
664 +
665 +        rmx_reset(&mop[nmats].imx);             /* not doing output side */
666 +        if (mop[nmats].rmp != &mop[nmats].imx)
667 +                rmx_reset(mop[nmats].rmp);
668 + #ifdef getc_unlocked
669 +        for (i = 0; i < nmats; i++)
670 +                flockfile(mop[i].infp);
671 + #endif
672 +                                                /* load & send rows to kids */
673 +        for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
674 +            int wfd = cproc[1 + cur_row%(nchildren-1)].w;
675 +            for (i = 0; i < nmats; i++)
676 +                if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
677 +                        if (cur_row > in_nrows) /* unknown #input rows? */
678 +                                break;
679 +                        fprintf(stderr, "%s: read error at row %d\n",
680 +                                        mop[i].inspec, cur_row);
681 +                        return(0);
682 +                }
683 +            if (i < nmats)
684 +                break;
685 +            for (i = 0; i < nmats; i++)
686 +                if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx))
687 +                                        != rmx_array_size(&mop[i].imx))
688 +                        return(0);
689 +        }
690 +        i = close_processes(cproc, nchildren);
691 +        free(cproc); cproc = NULL;
692 +        if (i < 0) {
693 +                fputs("Warning: missing child in parent_loop()\n", stderr);
694 +                return(1);
695 +        }
696 +        if (i > 0) {
697 +                fprintf(stderr, "Child exited with status %d\n", i);
698 +                return(0);
699 +        }
700 +        return(1);
701 + memerror:
702 +        fputs("Out of memory in parent_loop()\n", stderr);
703 +        exit(1);
704 + }
705 +
706 + static int
707 + combine_input(void)
708 + {
709 +        const int       row0 = (inchild > 0)*(inchild-1);
710 +        const int       rstep = nchildren + !nchildren;
711 +        ROPMAT          *res = &mop[nmats];
712 +        int             set_r, set_c;
713 +        RMATRIX         *tmp = NULL;
714 +        int             co_set;
715 +        int             i;
716 +
717          if (mcat && mcat_last &&
718                          !(tmp = rmx_alloc(1, res->imx.ncols, res->rmp->ncomp)))
719                  goto memerror;
572        res->imx.nrows = 1;
573        if (!rmx_prepare(&res->imx))
574                goto memerror;
720                                          /* figure out what the user set */
721          co_set = fundefined("co");
722          if (!co_set)
# Line 587 | Line 732 | combine_input(ROPMAT *res, FILE *fout)
732          } else                          /* save a little time */
733                  set_r = set_c = 0;
734                                          /* read/process row-by-row */
735 <        for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
735 >        for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) {
736              RMATRIX     *mres = NULL;
737 <            for (i = 0; i < nmats; i++) {
737 >            for (i = 0; i < nmats; i++)
738                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
739                          if (cur_row > in_nrows) /* unknown #input rows? */
740 <                                goto loop_exit;
740 >                                break;
741                          fprintf(stderr, "%s: read error at row %d\n",
742                                          mop[i].inspec, cur_row);
743                          return(0);
744                  }
745 +            if (i < nmats)
746 +                break;
747 +            for (i = 0; i < nmats; i++)
748                  if (!apply_op(mop[i].rmp, &mop[i].imx, &mop[i].preop))
749                          return(0);
602            }
750              if (set_r) varset("r", '=', cur_row);
751              for (cur_col = 0; cur_col < in_ncols; cur_col++) {
752                  if (set_c) varset("c", '=', cur_col);
# Line 636 | Line 783 | combine_input(ROPMAT *res, FILE *fout)
783                          return(0);
784              }
785              rmx_free(mres); mres = NULL;
786 +            if (inchild > 0) {          /* children share stdout */
787 +                i = getc(stdin);        /* signals it's our turn */
788 +                if (i != EOF) ungetc(i, stdin);
789 +            }
790              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
791 <                                res->rmp->ncols, res->rmp->dtype, fout))
791 >                                res->rmp->ncols, res->rmp->dtype, stdout))
792                  return(0);
793 +            if (inchild > 0 && fflush(stdout) == EOF)
794 +                return(0);
795          }
796 < loop_exit:
644 < #if 0           /* we're about to exit, so who cares? */
645 <        rmx_free(tmp);                  /* clean up */
646 <        rmx_reset(res->rmp);
647 <        rmx_reset(&res->imx);
648 <        for (i = 0; i < nmats; i++) {
649 <                rmx_reset(mop[i].rmp);
650 <                rmx_reset(&mop[i].imx);
651 <                if (mop[i].inspec[0] == '!')
652 <                        pclose(mop[i].infp);
653 <                else if (mop[i].inspec != stdin_name)
654 <                        fclose(mop[i].infp);
655 <                mop[i].infp = NULL;
656 <        }
657 < #endif
658 <        return(fflush(fout) != EOF);
796 >        return(inchild > 0 || fflush(stdout) != EOF);
797   memerror:
798          fputs("Out of buffer space in combine_input()\n", stderr);
799          return(0);
# Line 705 | Line 843 | main(int argc, char *argv[])
843          const char      *defCsym = NULL;
844          int             echoheader = 1;
845          int             stdin_used = 0;
846 +        int             nproc = 1;
847          const char      *mcat_spec = NULL;
848          int             n2comp = 0;
849          uby8            comp_ndx[128];
# Line 732 | Line 871 | main(int argc, char *argv[])
871                          case 'h':
872                                  echoheader = !echoheader;
873                                  break;
874 +                        case 'n':
875 +                                nproc = atoi(argv[++i]);
876 +                                if (nproc <= 0)
877 +                                        goto userr;
878 +                                break;
879                          case 'e':
880                                  if (!n) goto userr;
881                                  comp_ndx[n2comp++] = i++;
# Line 780 | Line 924 | main(int argc, char *argv[])
924                                  if (n && !isflt(argv[i+1])) {
925                                          mop[nmats].preop.csym = argv[++i];
926                                          mop[nmats].preop.clen = 0;
927 +                                        mcat_last = 0;
928                                          break;
929                                  }
930                                  if (n > MAXCOMP*MAXCOMP) n = MAXCOMP*MAXCOMP;
# Line 870 | Line 1015 | main(int argc, char *argv[])
1015                  return(1);
1016          }
1017          doptimize(1);                   /* optimize definitions */
1018 +        if (spawned_children(nproc))    /* running in parent process? */
1019 +                return(parent_loop() ? 0 : 1);
1020                                          /* process & write rows */
1021 <        return(combine_input(&mop[nmats], stdout) ? 0 : 1);
1021 >        return(combine_input() ? 0 : 1);
1022   stdin_error:
1023          fprintf(stderr, "%s: %s used for more than one input\n",
1024                          argv[0], stdin_name);
1025          return(1);
1026   userr:
1027          fprintf(stderr,
1028 <        "Usage: %s [-h][-f{adfc}][-e expr][-f file][-s sf .. | -c ce ..] m1 .. -m mcat > mres\n",
1028 >        "Usage: %s [-h][-f{adfc}][-n nproc][-e expr][-f file][-s sf .. | -c ce ..] m1 .. -m mcat > mres\n",
1029                          argv[0]);
1030          return(1);
1031   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines