--- ray/src/rt/RcontribSimulManager.cpp 2025/10/16 23:14:31 2.13 +++ ray/src/rt/RcontribSimulManager.cpp 2025/10/17 01:15:53 2.14 @@ -1,5 +1,5 @@ #ifndef lint -static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.13 2025/10/16 23:14:31 greg Exp $"; +static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.14 2025/10/17 01:15:53 greg Exp $"; #endif /* * RcontribSimulManager.cpp @@ -52,6 +52,8 @@ struct RowAssignment { uint32 ac; // accumulation count }; +static const char ROW_DONE[] = "ROW FINISHED\n"; + // Get format identifier const char * formstr(int f) @@ -625,8 +627,8 @@ RcontribSimulManager::ComputeRecord(const FVECT orig_d return 0; } if (nkids > 0) { // in parent process? - int k = GetChild(); // updates output rows - if (k < 0) return -1; // can't really happen + int k = GetChild(false); // updates output rows + if (k < 0) return -1; // someone died? RowAssignment rass; rass.row = kidRow[k] = rInPos++; rass.ac = accum; @@ -671,35 +673,40 @@ RcontribSimulManager::GetChild(bool forceWait) return -1; // take inventory int pn, n = 0; - fd_set writeset, errset; - FD_ZERO(&writeset); FD_ZERO(&errset); + fd_set readset, errset; + FD_ZERO(&readset); FD_ZERO(&errset); for (pn = nkids; pn--; ) { if (kidRow[pn] < 0) { // child already ready? if (forceWait) continue; return pn; // good enough } - FD_SET(kid[pn].w, &writeset); // will check on this one - FD_SET(kid[pn].w, &errset); - if (kid[pn].w >= n) - n = kid[pn].w + 1; + FD_SET(kid[pn].r, &readset); // will check on this one + FD_SET(kid[pn].r, &errset); + if (kid[pn].r >= n) + n = kid[pn].r + 1; } if (!n) // every child is idle? return -1; // wait on "busy" child(ren) - while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0) + while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0) if (errno != EINTR) { error(SYSTEM, "select call failed in GetChild()"); return -1; } + char buf[sizeof(ROW_DONE)] = "X"; pn = -1; // get flags set by select for (n = nkids; n--; ) if (kidRow[n] >= 0 && - FD_ISSET(kid[n].w, &writeset) | - FD_ISSET(kid[n].w, &errset)) { + FD_ISSET(kid[n].r, &readset) | + FD_ISSET(kid[n].r, &errset)) { + // check for error + if (FD_ISSET(kid[n].r, &errset) || + read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 || + memcmp(buf, ROW_DONE, sizeof(ROW_DONE))) + return -1; // update output row counts - if (!FD_ISSET(kid[n].w, &errset)) - UpdateRowsDone(kidRow[n]); - kidRow[n] = -1; // flag it available + UpdateRowsDone(kidRow[n]); + kidRow[n] = -1; // flag child available pn = n; } return pn; @@ -750,6 +757,9 @@ RcontribSimulManager::RunChild() if (ComputeRecord(vecList) <= 0) exit(1); + // signal this row is done + if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE)) + exit(1); } if (nr) { error(SYSTEM, "read error in child process"); @@ -775,12 +785,12 @@ RcontribSimulManager::StartKids(int n2go) fflush(stdout); // shouldn't use, anyway while (nkids < n2go) { kid[nkids] = sp_inactive; - kid[nkids].w = dup(1); - kid[nkids].flags |= PF_FILT_OUT; int rv = open_process(&kid[nkids], NULL); if (!rv) { // in child process? - while (nkids-- > 0) + while (nkids-- > 0) { + close(kid[nkids].r); close(kid[nkids].w); + } free(kid); free(kidRow); kid = NULL; kidRow = NULL; RunChild(); // should never return