--- ray/src/util/rcomb.c 2024/05/22 00:39:30 2.13 +++ ray/src/util/rcomb.c 2024/06/03 18:55:51 2.19 @@ -1,11 +1,15 @@ #ifndef lint -static const char RCSid[] = "$Id: rcomb.c,v 2.13 2024/05/22 00:39:30 greg Exp $"; +static const char RCSid[] = "$Id: rcomb.c,v 2.19 2024/06/03 18:55:51 greg Exp $"; #endif /* * General component matrix combiner, operating on a row at a time. + * + * Multi-processing mode under Unix creates children that each work + * on one input row at a time, fed by the original process. Final conversion + * and output to stdout is sorted by first child while its siblings send it + * their record calculations. */ -#include #include #include "platform.h" #include "rtprocess.h" @@ -57,17 +61,9 @@ int cur_chan; /* if we're looping channels */ SUBPROC *cproc = NULL; /* child process array */ int nchildren = 0; /* # of child processes */ int inchild = -1; /* our child ID (-1: parent) */ -int pgid = -1; /* process group ID */ -int nr_out = 0; /* # of rows output by kids */ static int checksymbolic(ROPMAT *rop); -static void -on_sigio(int dummy) -{ - nr_out++; /* happens when child outputs row */ -} - static int split_input(ROPMAT *rop) { @@ -549,9 +545,25 @@ output_headinfo(FILE *fp) } } +static void +reset_inputs(void) +{ + int i = nmats; + + fpurge(stdin); /* discard previous matrix input */ + while (i--) { + if (mop[i].infp != stdin) + fclose(mop[i].infp); /* ! pclose() */ + mop[i].infp = stdin; + mop[i].imx.dtype = DTdouble; + mop[i].imx.pflags &= ~RMF_SWAPIN; + } +} + static int spawned_children(int np) { + size_t recsize = 0; int i, rv; #if defined(_WIN32) || defined(_WIN64) @@ -560,13 +572,14 @@ spawned_children(int np) np = 1; } else #endif - if ((in_nrows > 0) & (np > in_nrows)) - np = in_nrows; + if ((in_nrows > 0) & (np*4 > in_nrows)) + np = in_nrows/4; /* we'll be doing a row at a time */ for (i = 0; i < nmats; i++) { mop[i].imx.nrows = 1; if (!rmx_prepare(&mop[i].imx)) goto memerror; + recsize += rmx_array_size(&mop[i].imx); if (mop[i].rmp != &mop[i].imx) { mop[i].rmp->nrows = 1; if (!rmx_prepare(mop[i].rmp)) @@ -587,7 +600,7 @@ spawned_children(int np) mop[nmats].imx.nrows = 1; if (!rmx_prepare(&mop[nmats].imx)) goto memerror; - if (np <= 1) { /* single process return point */ + if (np <= 1) { /* single process return */ #ifdef getc_unlocked for (i = 0; i < nmats; i++) flockfile(mop[i].infp); @@ -595,40 +608,49 @@ spawned_children(int np) #endif return(0); } - pgid = setpgrp(); /* set process group ID */ - signal(SIGIO, on_sigio); fflush(stdout); /* flush header & spawn children */ - cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*np); + nchildren = np+1; + cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren); if (!cproc) goto memerror; - nchildren = np; - for (i = 0; i < np; i++) { - cproc[i].flags = PF_FILT_OUT; - cproc[i].w = dup(1); + /* first child becomes stdout filter */ + cproc[0].flags = PF_FILT_OUT; + cproc[0].w = fileno(stdout); + cproc[0].r = 0; + cproc[0].pid = -1; + rv = open_process(&cproc[i], NULL); + if (rv < 0) { + perror("fork"); + exit(1); + } + if (rv == 0) { /* loop if first child */ + inchild = 0; + reset_inputs(); + output_loop(); + _exit(0); + } + /* start other children */ + for (i = 1; (i < nchildren) & (rv > 0); i++) { + cproc[i].flags = 0; + cproc[i].w = 1; cproc[i].r = 0; cproc[i].pid = -1; rv = open_process(&cproc[i], NULL); - if (rv <= 0) break; } - if (rv > 0) - return(1); /* parent return value */ if (rv < 0) { perror("fork"); + close_processes(cproc, i); exit(1); } + if (rv > 0) /* parent return? */ + return(1); inchild = i; /* our child index */ - while (i-- > 0) /* don't share siblings' pipes */ + while (--i > 0) /* don't share siblings' pipes */ close(cproc[i].w); - fpurge(stdin); /* discard previous matrix input */ + reset_inputs(); #ifdef getc_unlocked flockfile(stdin); #endif - for (i = 0; i < nmats; i++) { - if (mop[i].infp != stdin) - fclose(mop[i].infp); /* ! pclose() */ - mop[i].infp = stdin; - mop[i].imx.dtype = DTdouble; - } return(0); /* child return */ memerror: fputs("Out of memory in spawned_children()\n", stderr); @@ -636,25 +658,20 @@ memerror: } static int -parent_loop() +parent_loop(void) { - FILE **outfp = (FILE **)malloc(nchildren*sizeof(FILE *)); int i; - if (!outfp) goto memerror; - for (i = 0; i < nchildren; i++) { - outfp[i] = fdopen(cproc[i].w, "w"); - if (!outfp[i]) goto memerror; + rmx_reset(&mop[nmats].imx); /* not doing output side */ + if (mop[nmats].rmp != &mop[nmats].imx) + rmx_reset(mop[nmats].rmp); #ifdef getc_unlocked - flockfile(outfp[i]); -#endif - } -#ifdef getc_unlocked for (i = 0; i < nmats; i++) flockfile(mop[i].infp); #endif + /* load & send rows to kids */ for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) { - FILE *ofp = outfp[cur_row % nchildren]; + int wfd = cproc[1 + cur_row%(nchildren-1)].w; for (i = 0; i < nmats; i++) if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) { if (cur_row > in_nrows) /* unknown #input rows? */ @@ -666,15 +683,10 @@ parent_loop() if (i < nmats) break; for (i = 0; i < nmats; i++) - if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp, - mop[i].imx.ncols, DTdouble, ofp)) - return(0); - if (fflush(ofp) == EOF) - return(0); + if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx)) + != rmx_array_size(&mop[i].imx)) + return(0); } - for (i = 0; i < nchildren; i++) - fclose(outfp[i]); - free(outfp); i = close_processes(cproc, nchildren); free(cproc); cproc = NULL; if (i < 0) { @@ -692,15 +704,14 @@ memerror: } static int -combine_input() +combine_input(void) { - const int row0 = (inchild >= 0)*inchild; + const int row0 = (inchild > 0)*(inchild-1); const int rstep = nchildren + !nchildren; ROPMAT *res = &mop[nmats]; int set_r, set_c; RMATRIX *tmp = NULL; int co_set; - sigset_t iomask; int i; if (mcat && mcat_last && @@ -720,13 +731,10 @@ combine_input() set_c = varlookup("c") != NULL && !vardefined("c"); } else /* save a little time */ set_r = set_c = 0; - - sigemptyset(&iomask); /* read/process row-by-row */ - sigaddset(&iomask, SIGIO); + /* read/process row-by-row */ for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) { RMATRIX *mres = NULL; - for (i = 0; i < nmats; i++) { - if (inchild >= 0) sigprocmask(SIG_BLOCK, &iomask, NULL); + for (i = 0; i < nmats; i++) if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) { if (cur_row > in_nrows) /* unknown #input rows? */ break; @@ -734,8 +742,6 @@ combine_input() mop[i].inspec, cur_row); return(0); } - if (inchild >= 0) sigprocmask(SIG_UNBLOCK, &iomask, NULL); - } if (i < nmats) break; for (i = 0; i < nmats; i++) @@ -777,22 +783,17 @@ combine_input() return(0); } rmx_free(mres); mres = NULL; - if (inchild >= 0) { /* children share stdout */ - while (nr_out < cur_row) - pause(); /* wait for our turn */ - sigprocmask(SIG_BLOCK, &iomask, NULL); + if (inchild > 0) { /* children share stdout */ + i = getc(stdin); /* signals it's our turn */ + if (i != EOF) ungetc(i, stdin); } if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp, res->rmp->ncols, res->rmp->dtype, stdout)) return(0); - if (inchild >= 0) { /* flush and notify group */ - if (fflush(stdout) == EOF) - return(0); - sigprocmask(SIG_UNBLOCK, &iomask, NULL); - killpg(pgid, SIGIO); /* increments everyone's nr_out */ - } + if (inchild > 0 && fflush(stdout) == EOF) + return(0); } - return(inchild >= 0 || fflush(stdout) != EOF); + return(inchild > 0 || fflush(stdout) != EOF); memerror: fputs("Out of buffer space in combine_input()\n", stderr); return(0); @@ -923,6 +924,7 @@ main(int argc, char *argv[]) if (n && !isflt(argv[i+1])) { mop[nmats].preop.csym = argv[++i]; mop[nmats].preop.clen = 0; + mcat_last = 0; break; } if (n > MAXCOMP*MAXCOMP) n = MAXCOMP*MAXCOMP;