ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/rcomb.c
(Generate patch)

Comparing ray/src/util/rcomb.c (file contents):
Revision 2.8 by greg, Thu May 16 18:59:19 2024 UTC vs.
Revision 2.20 by greg, Tue Jun 4 21:23:11 2024 UTC

# Line 3 | Line 3 | static const char RCSid[] = "$Id$";
3   #endif
4   /*
5   * General component matrix combiner, operating on a row at a time.
6 + *
7 + * Multi-processing mode under Unix creates children that each work
8 + * on one input row at a time, fed by the original process.  Final conversion
9 + * and output to stdout is sorted by last child while its siblings send it
10 + * their record calculations.
11   */
12  
8 #include <errno.h>
13   #include <math.h>
14   #include "platform.h"
15 + #include "rtprocess.h"
16   #include "rtio.h"
17   #include "rmatrix.h"
18   #include "calcomp.h"
14 #include "paths.h"
19  
20   #ifndef M_PI
21   #define M_PI    3.14159265358979323846
# Line 54 | Line 58 | int            cur_row;                        /* current input/output row */
58   int             cur_col;                        /* current input/output column */
59   int             cur_chan;                       /* if we're looping channels */
60  
61 + SUBPROC         *cproc = NULL;                  /* child process array */
62 + int             nchildren = 0;                  /* # of child processes */
63 + int             inchild = -1;                   /* our child ID (-1: parent) */
64 +
65   static int      checksymbolic(ROPMAT *rop);
66  
67   static int
# Line 390 | Line 398 | apply_op(RMATRIX *dst, const RMATRIX *src, const RUNAR
398                          return(0);
399                  rmx_free(res);
400          } else if (dst != src)
401 <                memcpy(dst->mtx, src->mtx,
394 <                                sizeof(double)*dst->ncomp*dst->ncols*dst->nrows);
401 >                memcpy(dst->mtx, src->mtx, rmx_array_size(dst));
402          if (ro->nsf == dst->ncomp)
403                  rmx_scale(dst, ro->sca);
404          return(1);
# Line 539 | Line 546 | output_headinfo(FILE *fp)
546   }
547  
548   static int
549 < combine_input(ROPMAT *res, FILE *fout)
549 > output_loop(void)
550   {
551 <        int     set_r, set_c;
552 <        RMATRIX *tmp = NULL;
553 <        int     co_set;
554 <        int     i;
555 <                                        /* allocate input row buffers */
551 >        const size_t    row_size = rmx_array_size(mop[nmats].rmp);
552 >        int             i = nmats;
553 >        int             cur_child = 0;
554 >
555 >        if (mop[nmats].rmp != &mop[nmats].imx)          /* output is split? */
556 >                rmx_reset(&mop[nmats].imx);
557 >        while (i-- > 0) {                               /* close input matrices */
558 >                fclose(mop[i].infp);            /* ! pclose() */
559 >                mop[i].infp = NULL;
560 >                rmx_reset(&mop[i].imx);
561 >                if (mop[i].rmp != &mop[i].imx) {
562 >                        rmx_free(mop[i].rmp);
563 >                        mop[i].rmp = &mop[i].imx;
564 >                }
565 >        }
566 > #ifdef getc_unlocked
567 >        flockfile(stdout);                              /* we own this, now */
568 > #endif
569 >        for ( ; ; ) {                                   /* loop until no more */
570 >                ssize_t         rv;
571 >                rv = readbuf(cproc[cur_child].r, mop[nmats].rmp->mtx, row_size);
572 >                if (!rv)                                /* out of rows? */
573 >                        break;
574 >                if (rv != row_size) {
575 >                        fputs("Read error in output loop\n", stderr);
576 >                        return(0);
577 >                }                                       /* do final conversion */
578 >                if (!rmx_write_data(mop[nmats].rmp->mtx, mop[nmats].rmp->ncomp,
579 >                                mop[nmats].rmp->ncols, mop[nmats].rmp->dtype, stdout)) {
580 >                        fputs("Conversion/write error in output loop\n", stderr);
581 >                        return(0);
582 >                }
583 >                cur_child++;
584 >                cur_child *= (cur_child < inchild);
585 >        }
586 >        return(fflush(stdout) != EOF);
587 > }
588 >
589 > static int
590 > spawned_children(int np)
591 > {
592 >        int     i, rv;
593 >
594 > #if defined(_WIN32) || defined(_WIN64)
595 >        if (np > 1) {
596 >                fputs("Warning: only one process under Windows\n", stderr);
597 >                np = 1;
598 >        } else
599 > #endif
600 >        if ((in_nrows > 0) & (np*4 > in_nrows))
601 >                np = in_nrows/4;
602 >                                /* we'll be doing a row at a time */
603          for (i = 0; i < nmats; i++) {
604 <                mop[i].imx.nrows = 1;   /* we'll be doing a row at a time */
604 >                mop[i].imx.nrows = 1;
605                  if (!rmx_prepare(&mop[i].imx))
606                          goto memerror;
607                  if (mop[i].rmp != &mop[i].imx) {
# Line 556 | Line 610 | combine_input(ROPMAT *res, FILE *fout)
610                                  goto memerror;
611                  }
612          }
613 <                                        /* prep output row buffers */
614 <        if (mcat || res->preop.clen > 0) {
615 <                if (!split_input(res))  /* need separate buffer */
613 >                                /* prep output row buffer(s) */
614 >        if (mcat || mop[nmats].preop.clen > 0) {
615 >                if (!split_input(&mop[nmats]))  /* need separate buffer */
616                          return(0);
617 <                if (res->preop.clen > 0)
618 <                        res->rmp->ncomp = res->preop.clen / res->imx.ncomp;
619 <                res->rmp->nrows = 1;
620 <                if (!mcat | !mcat_last && !rmx_prepare(res->rmp))
617 >                if (mop[nmats].preop.clen > 0)
618 >                        mop[nmats].rmp->ncomp = mop[nmats].preop.clen /
619 >                                                mop[nmats].imx.ncomp;
620 >        }
621 >        mop[nmats].imx.nrows = 1;
622 >        if (!rmx_prepare(&mop[nmats].imx))
623 >                goto memerror;
624 >        if (mop[nmats].rmp != &mop[nmats].imx) {
625 >                mop[nmats].rmp->nrows = 1;
626 >                if (!rmx_prepare(mop[nmats].rmp))
627                          goto memerror;
628          }
629 +        if (np <= 1) {          /* single process return */
630 + #ifdef getc_unlocked
631 +                for (i = 0; i < nmats; i++)
632 +                        flockfile(mop[i].infp);
633 +                flockfile(stdout);
634 + #endif
635 +                return(0);
636 +        }
637 +        fflush(stdout);         /* flush header & spawn children */
638 +        nchildren = np + 1;     /* extra child to sequence output */
639 +        cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren);
640 +        if (!cproc)
641 +                goto memerror;
642 +        for (i = nchildren; i--; ) cproc[i] = sp_inactive;
643 +        cproc[nchildren-1].flags |= PF_FILT_OUT;
644 +                                /* start each child */
645 +        for (i = 0; i < nchildren; i++) {
646 +                rv = open_process(&cproc[i], NULL);
647 +                if (rv <= 0) break;
648 +        }
649 +        if (rv < 0) {
650 +                perror("fork");
651 +                close_processes(cproc, i);
652 +                exit(1);
653 +        }
654 +        if (rv) {               /* are we the parent? */
655 +                i = nchildren-1;        /* last child is sole reader */
656 +                while (i-- > 0)
657 +                        close(cproc[i].r);
658 +                return(1);      /* parent return value */
659 +        }
660 +        inchild = i;            /* our child index */
661 +        while (i-- > 0)         /* only parent writes siblings */
662 +                close(cproc[i].w);
663 +
664 +        if (inchild == nchildren-1)     /* last child sequences output */
665 +                exit(output_loop() ? 0 : 1);
666 +
667 +        i = inchild;            /* won't read from siblings */
668 +        while (i-- > 0)
669 +                close(cproc[i].r);
670 +        i = nmats;              /* close input matrix streams */
671 +        while (i-- > 0) {
672 +                if (mop[i].infp != stdin)
673 +                        fclose(mop[i].infp);    /* ! pclose() */
674 +                mop[i].infp = stdin;
675 +                mop[i].imx.dtype = DTrmx_native;
676 +                mop[i].imx.pflags &= ~RMF_SWAPIN;
677 +        }
678 +        fpurge(stdin);          /* discard any previous matrix input */
679 + #ifdef getc_unlocked
680 +        flockfile(stdin);
681 + #endif
682 +        mop[nmats].rmp->dtype = DTrmx_native;
683 +        return(0);              /* worker child return value */
684 + memerror:
685 +        fputs("Out of memory in spawned_children()\n", stderr);
686 +        exit(1);
687 + }
688 +
689 + static int
690 + parent_loop(void)
691 + {
692 +        int     i;
693 +
694 +        rmx_reset(&mop[nmats].imx);             /* not touching output side */
695 +        if (mop[nmats].rmp != &mop[nmats].imx) {
696 +                rmx_free(mop[nmats].rmp);
697 +                mop[nmats].rmp = &mop[nmats].imx;
698 +        }
699 + #ifdef getc_unlocked
700 +        for (i = 0; i < nmats; i++)             /* we handle matrix inputs */
701 +                flockfile(mop[i].infp);
702 + #endif
703 +                                                /* load & send rows to kids */
704 +        for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
705 +            int         wfd = cproc[cur_row % (nchildren-1)].w;
706 +            for (i = 0; i < nmats; i++)
707 +                if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
708 +                        if (cur_row > in_nrows) /* unknown #input rows? */
709 +                                break;
710 +                        fprintf(stderr, "%s: parent read error at row %d\n",
711 +                                        mop[i].inspec, cur_row);
712 +                        return(0);
713 +                }
714 +            if (i < nmats)
715 +                break;
716 +            for (i = 0; i < nmats; i++)
717 +                if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx))
718 +                                        != rmx_array_size(&mop[i].imx))
719 +                        return(0);
720 +        }
721 +        i = close_processes(cproc, nchildren);
722 +        free(cproc); cproc = NULL; nchildren = 0;
723 +        if (i < 0) {
724 +                fputs("Warning: lost child in parent_loop()\n", stderr);
725 +                return(1);
726 +        }
727 +        if (i > 0) {
728 +                fprintf(stderr, "Child exited with status %d\n", i);
729 +                return(0);
730 +        }
731 +        return(1);                              /* return success! */
732 + memerror:
733 +        fputs("Out of memory in parent_loop()\n", stderr);
734 +        exit(1);
735 + }
736 +
737 + static int
738 + combine_input(void)
739 + {
740 +        const int       row0 = (inchild >= 0)*inchild;
741 +        const int       rstep = nchildren ? nchildren-1 : 1;
742 +        ROPMAT          *res = &mop[nmats];
743 +        int             set_r, set_c;
744 +        RMATRIX         *tmp = NULL;
745 +        int             co_set;
746 +        int             i;
747 +
748          if (mcat && mcat_last &&
749                          !(tmp = rmx_alloc(1, res->imx.ncols, res->rmp->ncomp)))
750                  goto memerror;
572        res->imx.nrows = 1;
573        if (!rmx_prepare(&res->imx))
574                goto memerror;
751                                          /* figure out what the user set */
752          co_set = fundefined("co");
753          if (!co_set)
# Line 587 | Line 763 | combine_input(ROPMAT *res, FILE *fout)
763          } else                          /* save a little time */
764                  set_r = set_c = 0;
765                                          /* read/process row-by-row */
766 <        for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) {
766 >        for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) {
767              RMATRIX     *mres = NULL;
768 <            for (i = 0; i < nmats; i++) {
768 >            for (i = 0; i < nmats; i++)
769                  if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) {
770                          if (cur_row > in_nrows) /* unknown #input rows? */
771 <                                goto loop_exit;
771 >                                break;
772                          fprintf(stderr, "%s: read error at row %d\n",
773                                          mop[i].inspec, cur_row);
774                          return(0);
775                  }
776 +            if (i < nmats)
777 +                break;
778 +            for (i = 0; i < nmats; i++)
779                  if (!apply_op(mop[i].rmp, &mop[i].imx, &mop[i].preop))
780                          return(0);
602            }
781              if (set_r) varset("r", '=', cur_row);
782              for (cur_col = 0; cur_col < in_ncols; cur_col++) {
783                  if (set_c) varset("c", '=', cur_col);
# Line 637 | Line 815 | combine_input(ROPMAT *res, FILE *fout)
815              }
816              rmx_free(mres); mres = NULL;
817              if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp,
818 <                                res->rmp->ncols, res->rmp->dtype, fout))
818 >                                res->rmp->ncols, res->rmp->dtype, stdout))
819                  return(0);
820 +            if (inchild >= 0 && fflush(stdout) == EOF)
821 +                return(0);
822          }
823 < loop_exit:
644 < #if 0           /* we're about to exit, so who cares? */
645 <        rmx_free(tmp);                  /* clean up */
646 <        rmx_reset(res->rmp);
647 <        rmx_reset(&res->imx);
648 <        for (i = 0; i < nmats; i++) {
649 <                rmx_reset(mop[i].rmp);
650 <                rmx_reset(&mop[i].imx);
651 <                if (mop[i].inspec[0] == '!')
652 <                        pclose(mop[i].infp);
653 <                else if (mop[i].inspec != stdin_name)
654 <                        fclose(mop[i].infp);
655 <                mop[i].infp = NULL;
656 <        }
657 < #endif
658 <        return(fflush(fout) != EOF);
823 >        return(inchild >= 0 || fflush(stdout) != EOF);
824   memerror:
825          fputs("Out of buffer space in combine_input()\n", stderr);
826          return(0);
# Line 705 | Line 870 | main(int argc, char *argv[])
870          const char      *defCsym = NULL;
871          int             echoheader = 1;
872          int             stdin_used = 0;
873 +        int             nproc = 1;
874          const char      *mcat_spec = NULL;
875          int             n2comp = 0;
876          uby8            comp_ndx[128];
# Line 732 | Line 898 | main(int argc, char *argv[])
898                          case 'h':
899                                  echoheader = !echoheader;
900                                  break;
901 +                        case 'n':
902 +                                nproc = atoi(argv[++i]);
903 +                                if (nproc <= 0)
904 +                                        goto userr;
905 +                                break;
906                          case 'e':
907                                  if (!n) goto userr;
908                                  comp_ndx[n2comp++] = i++;
# Line 780 | Line 951 | main(int argc, char *argv[])
951                                  if (n && !isflt(argv[i+1])) {
952                                          mop[nmats].preop.csym = argv[++i];
953                                          mop[nmats].preop.clen = 0;
954 +                                        mcat_last = 0;
955                                          break;
956                                  }
957                                  if (n > MAXCOMP*MAXCOMP) n = MAXCOMP*MAXCOMP;
# Line 870 | Line 1042 | main(int argc, char *argv[])
1042                  return(1);
1043          }
1044          doptimize(1);                   /* optimize definitions */
1045 +        if (spawned_children(nproc))    /* running in parent process? */
1046 +                return(parent_loop() ? 0 : 1);
1047                                          /* process & write rows */
1048 <        return(combine_input(&mop[nmats], stdout) ? 0 : 1);
1048 >        return(combine_input() ? 0 : 1);
1049   stdin_error:
1050          fprintf(stderr, "%s: %s used for more than one input\n",
1051                          argv[0], stdin_name);
1052          return(1);
1053   userr:
1054          fprintf(stderr,
1055 <        "Usage: %s [-h][-f{adfc}][-e expr][-f file][-s sf .. | -c ce ..] m1 .. -m mcat > mres\n",
1055 >        "Usage: %s [-h][-f{adfc}][-n nproc][-e expr][-f file][-s sf .. | -c ce ..] m1 .. -m mcat > mres\n",
1056                          argv[0]);
1057          return(1);
1058   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines