--- ray/src/util/rcomb.c 2024/05/23 19:29:41 2.18 +++ ray/src/util/rcomb.c 2024/06/03 18:55:51 2.19 @@ -1,19 +1,13 @@ #ifndef lint -static const char RCSid[] = "$Id: rcomb.c,v 2.18 2024/05/23 19:29:41 greg Exp $"; +static const char RCSid[] = "$Id: rcomb.c,v 2.19 2024/06/03 18:55:51 greg Exp $"; #endif /* * General component matrix combiner, operating on a row at a time. * * Multi-processing mode under Unix creates children that each work - * on one input row at a time, fed by the single parent. Output to - * a shared stdout is assured because each child waits for parent - * to be ready with next input row before it outputs the previous result. - * The assumption (checked in spawn_children()) is that the parent - * will remain blocked until the child it is feeding has finished - * flushing the previous row to stdout. The final output row is - * less guaranteed to be in order, so the parent sleeps for a few seconds - * between each child pipe closure, in hopes that this allows enough - * time for row output to finish in each process. + * on one input row at a time, fed by the original process. Final conversion + * and output to stdout is sorted by first child while its siblings send it + * their record calculations. */ #include @@ -551,6 +545,21 @@ output_headinfo(FILE *fp) } } +static void +reset_inputs(void) +{ + int i = nmats; + + fpurge(stdin); /* discard previous matrix input */ + while (i--) { + if (mop[i].infp != stdin) + fclose(mop[i].infp); /* ! pclose() */ + mop[i].infp = stdin; + mop[i].imx.dtype = DTdouble; + mop[i].imx.pflags &= ~RMF_SWAPIN; + } +} + static int spawned_children(int np) { @@ -600,23 +609,33 @@ spawned_children(int np) return(0); } fflush(stdout); /* flush header & spawn children */ - cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*np); + nchildren = np+1; + cproc = (SUBPROC *)malloc(sizeof(SUBPROC)*nchildren); if (!cproc) goto memerror; - nchildren = np; - for (i = 0; i < np; i++) { - cproc[i].flags = PF_FILT_OUT; - cproc[i].w = dup(1); + /* first child becomes stdout filter */ + cproc[0].flags = PF_FILT_OUT; + cproc[0].w = fileno(stdout); + cproc[0].r = 0; + cproc[0].pid = -1; + rv = open_process(&cproc[i], NULL); + if (rv < 0) { + perror("fork"); + exit(1); + } + if (rv == 0) { /* loop if first child */ + inchild = 0; + reset_inputs(); + output_loop(); + _exit(0); + } + /* start other children */ + for (i = 1; (i < nchildren) & (rv > 0); i++) { + cproc[i].flags = 0; + cproc[i].w = 1; cproc[i].r = 0; cproc[i].pid = -1; rv = open_process(&cproc[i], NULL); - if (rv <= 0) break; - if (!i && 2*rv >= recsize) { - fputs("Problem too small for multi-processing\n", - stderr); - close_processes(cproc, 1); - exit(1); - } } if (rv < 0) { perror("fork"); @@ -626,19 +645,12 @@ spawned_children(int np) if (rv > 0) /* parent return? */ return(1); inchild = i; /* our child index */ - while (i-- > 0) /* don't share siblings' pipes */ + while (--i > 0) /* don't share siblings' pipes */ close(cproc[i].w); - fpurge(stdin); /* discard previous matrix input */ + reset_inputs(); #ifdef getc_unlocked flockfile(stdin); #endif - for (i = 0; i < nmats; i++) { - if (mop[i].infp != stdin) - fclose(mop[i].infp); /* ! pclose() */ - mop[i].infp = stdin; - mop[i].imx.dtype = DTdouble; - mop[i].imx.pflags &= ~RMF_SWAPIN; - } return(0); /* child return */ memerror: fputs("Out of memory in spawned_children()\n", stderr); @@ -646,25 +658,20 @@ memerror: } static int -parent_loop() +parent_loop(void) { - FILE **outfp = (FILE **)malloc(nchildren*sizeof(FILE *)); int i; - if (!outfp) goto memerror; - for (i = 0; i < nchildren; i++) { - outfp[i] = fdopen(cproc[i].w, "w"); - if (!outfp[i]) goto memerror; + rmx_reset(&mop[nmats].imx); /* not doing output side */ + if (mop[nmats].rmp != &mop[nmats].imx) + rmx_reset(mop[nmats].rmp); #ifdef getc_unlocked - flockfile(outfp[i]); -#endif - } -#ifdef getc_unlocked for (i = 0; i < nmats; i++) flockfile(mop[i].infp); #endif + /* load & send rows to kids */ for (cur_row = 0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row++) { - FILE *ofp = outfp[cur_row % nchildren]; + int wfd = cproc[1 + cur_row%(nchildren-1)].w; for (i = 0; i < nmats; i++) if (!rmx_load_row(mop[i].imx.mtx, &mop[i].imx, mop[i].infp)) { if (cur_row > in_nrows) /* unknown #input rows? */ @@ -676,17 +683,10 @@ parent_loop() if (i < nmats) break; for (i = 0; i < nmats; i++) - if (!rmx_write_data(mop[i].imx.mtx, mop[i].imx.ncomp, - mop[i].imx.ncols, DTdouble, ofp)) - return(0); - if (fflush(ofp) == EOF) - return(0); + if (writebuf(wfd, mop[i].imx.mtx, rmx_array_size(&mop[i].imx)) + != rmx_array_size(&mop[i].imx)) + return(0); } - for (i = 0; i < nchildren; i++) { /* maintain output order */ - sleep(1+(mop[nmats].rmp->ncols*mop[nmats].rmp->ncomp >> 15)); - fclose(outfp[i]); - } - free(outfp); i = close_processes(cproc, nchildren); free(cproc); cproc = NULL; if (i < 0) { @@ -704,9 +704,9 @@ memerror: } static int -combine_input() +combine_input(void) { - const int row0 = (inchild >= 0)*inchild; + const int row0 = (inchild > 0)*(inchild-1); const int rstep = nchildren + !nchildren; ROPMAT *res = &mop[nmats]; int set_r, set_c; @@ -731,7 +731,6 @@ combine_input() set_c = varlookup("c") != NULL && !vardefined("c"); } else /* save a little time */ set_r = set_c = 0; - /* read/process row-by-row */ for (cur_row = row0; (in_nrows <= 0) | (cur_row < in_nrows); cur_row += rstep) { RMATRIX *mres = NULL; @@ -784,17 +783,17 @@ combine_input() return(0); } rmx_free(mres); mres = NULL; - if (inchild >= 0) { /* children share stdout */ + if (inchild > 0) { /* children share stdout */ i = getc(stdin); /* signals it's our turn */ if (i != EOF) ungetc(i, stdin); } if (!rmx_write_data(res->rmp->mtx, res->rmp->ncomp, res->rmp->ncols, res->rmp->dtype, stdout)) return(0); - if (inchild >= 0 && fflush(stdout) == EOF) + if (inchild > 0 && fflush(stdout) == EOF) return(0); } - return(inchild >= 0 || fflush(stdout) != EOF); + return(inchild > 0 || fflush(stdout) != EOF); memerror: fputs("Out of buffer space in combine_input()\n", stderr); return(0);