ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.3
Committed: Wed Oct 30 01:38:21 2024 UTC (6 months, 2 weeks ago) by greg
Branch: MAIN
Changes since 2.2: +5 -4 lines
Log Message:
fix(rxcontrib): Fixes in file loading & process management

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.3 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.2 2024/10/29 19:47:19 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27 greg 2.2 int contrib = 0; // computing contributions?
28    
29     int xres = 0; // horizontal (scan) size
30     int yres = 0; // vertical resolution
31 greg 2.1
32     // new/exclusive, overwrite if exists, or recover data
33     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
34     RDSread|RDSwrite};
35    
36     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
37     #define LNROWSTR 6
38    
39     // Modifier channel for recording contributions (no constructor/destructor)
40     struct RcontribMod {
41     RcontribOutput * opl; // pointer to first output channel
42     char * params; // parameters string
43     EPNODE * binv; // bin expression
44     int nbins; // bin count this modifier
45     int coffset; // column offset in bytes
46     DCOLORV cbin[1]; // bin accumulator (extends struct)
47     /// Access specific (spectral) color bin
48     DCOLORV * operator[](int n) {
49     if ((n < 0) | (n >= nbins)) return NULL;
50     return cbin + n*NCSAMP;
51     }
52     };
53    
54 greg 2.2 // Struct used to assign record calculation to child
55 greg 2.1 struct RowAssignment {
56     uint32 row; // row to do
57     uint32 ac; // accumulation count
58     };
59    
60     // Our default data share function
61     RdataShare *
62     defDataShare(const char *name, RCOutputOp op, size_t siz)
63     {
64     return new RdataShareMap(name, RSDOflags[op], siz);
65     }
66    
67     // Allocate rcontrib accumulator
68     RcontribMod *
69     NewRcMod(const char *prms, const char *binexpr, int ncbins)
70     {
71     if (ncbins <= 0) return NULL;
72     if (!prms) prms = "";
73     if (!binexpr | (ncbins == 1))
74     binexpr = "0";
75    
76     RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
77     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
78     strlen(prms)+1);
79    
80     mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
81     mp->binv = eparse(const_cast<char *>(binexpr));
82     mp->nbins = ncbins;
83     return mp;
84     }
85    
86     // Free an RcontribMod
87     void
88     FreeRcMod(void *p)
89     {
90     if (!p) return;
91     epfree((*(RcontribMod *)p).binv, true);
92     efree(p);
93     }
94    
95     // Set output format ('f', 'd', or 'c')
96     bool
97     RcontribSimulManager::SetDataFormat(int ty)
98     {
99     if (outList) {
100     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
101     return false;
102     }
103     switch (ty) {
104     case 'f':
105     dsiz = sizeof(float)*NCSAMP;
106     break;
107     case 'd':
108     dsiz = sizeof(double)*NCSAMP;
109     break;
110     case 'c':
111     dsiz = LSCOLR;
112     break;
113     default:
114     sprintf(errmsg, "unsupported output format '%c'", ty);
115     error(INTERNAL, errmsg);
116     return false;
117     }
118     dtyp = ty;
119     return true;
120     }
121    
122     // static call-back for rcontrib ray-tracing
123     int
124     RcontribSimulManager::RctCall(RAY *r, void *cd)
125     {
126     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
127     return 0;
128     // shadow ray not on source?
129     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
130     return 0;
131    
132     const char * mname = objptr(r->ro->omod)->oname;
133     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
134     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
135     if (!mp)
136     return 0; // not in our modifier list
137    
138     worldfunc(RCCONTEXT, r); // compute bin #
139     set_eparams(mp->params);
140     double bval = evalue(mp->binv);
141     if (bval <= -.5)
142     return 0; // silently ignore negative bin index
143     DCOLORV * dvp = (*mp)[int(bval + .5)];
144     if (!dvp) {
145     sprintf(errmsg, "bad bin number for '%s' (%.1f ignored)", mname, bval);
146     error(WARNING, errmsg);
147     return 0;
148     }
149     SCOLOR contr;
150     raycontrib(contr, r, PRIMARY); // compute coefficient
151     if (contrib)
152     smultscolor(contr, r->rcol); // -> value contribution
153     for (int i = 0; i < NCSAMP; i++)
154     *dvp++ += contr[i]; // accumulate color/spectrum
155     return 1;
156     }
157    
158     // check for given format type in string, return last char position
159     static int
160     hasFormat(const char *tst, const char *match)
161     {
162     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
163     const char * s = tst;
164     while (*s) {
165     if (*s++ != '%')
166     continue;
167     while (!strchr(allPoss, *s))
168     s++;
169     if (strchr(match, *s))
170     break;
171     s++;
172     }
173     return (*s != '\0')*(s - tst);
174     }
175    
176     // Add a modifier and arguments, opening output file(s)
177     bool
178     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
179     const char *prms, const char *binval, int bincnt)
180     {
181     if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
182     error(WARNING, "ignoring bad call to AddModifier()");
183     return false;
184     }
185     if (!nChan) { // initial call?
186     if (!SetDataFormat(dtyp))
187     return false;
188     nChan = NCSAMP;
189     } else if (nChan != NCSAMP) {
190     error(INTERNAL, "number of spectral channels must be fixed");
191     return false;
192     }
193     if (Ready()) {
194     error(INTERNAL, "call to AddModifier() after PrepOutput()");
195     return false;
196     }
197     LUENT * lp = lu_find(&modLUT, modn);
198     if (lp->data) {
199     sprintf(errmsg, "duplicate modifier '%s'", modn);
200     error(USER, errmsg);
201     return false;
202     }
203     if (!lp->key) // create new entry
204     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
205    
206     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
207     if (!mp) return false;
208     lp->data = (char *)mp;
209     const int modndx = hasFormat(outspec, "sb");
210     const int binndx = hasFormat(outspec, "diouxX");
211     int bin0 = 0;
212     char fnbuf[512];
213     if (!modndx | (modndx > binndx))
214     sprintf(fnbuf, outspec, bin0, modn);
215     else
216     sprintf(fnbuf, outspec, modn, bin0);
217     RcontribOutput * olast = NULL;
218     RcontribOutput * op;
219     for (op = outList; op; op = (olast=op)->next) {
220     if (strcmp(op->GetName(), fnbuf))
221     continue;
222     if (modndx) { // this ain't right
223     sprintf(errmsg, "output name collision for '%s'", fnbuf);
224     error(USER, errmsg);
225     return false;
226     }
227     if ((binndx > 0) & (xres > 0)) {
228     sprintf(fnbuf, outspec, ++bin0);
229     continue; // each bin goes to another image
230     }
231     mp->coffset = op->rowBytes; // else add to what's there
232     break;
233     }
234     if (!op) { // create new output channel?
235     op = new RcontribOutput(fnbuf);
236     if (olast) olast->next = op;
237     else outList = op;
238     }
239     mp->opl = op; // first (maybe only) output channel
240     if (modndx) // remember modifier if part of name
241     op->omod = lp->key;
242     if (binndx > 0) { // append output image/bin list
243     op->rowBytes += dsiz;
244     op->obin = bin0;
245     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
246     if (!modndx | (modndx > binndx))
247     sprintf(fnbuf, outspec, bin0+bincnt, modn);
248     else
249     sprintf(fnbuf, outspec, modn, bin0+bincnt);
250     if (!op->next) {
251     olast = op;
252     olast->next = op = new RcontribOutput(fnbuf);
253     if (modndx) op->omod = lp->key;
254     op->obin = bin0+bincnt;
255     } else {
256     op = op->next;
257     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
258     "bin number mismatch in AddModifier()");
259     }
260     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
261     "row offset mismatch in AddModifier()");
262     op->rowBytes += dsiz;
263     }
264     } else // else send all results to this channel
265     op->rowBytes += bincnt*dsiz;
266     return true;
267     }
268    
269     // Add a file of modifiers with associated arguments
270     bool
271     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
272     const char *prms,
273     const char *binval, int bincnt)
274     {
275     char * path = getpath(const_cast<char *>(modfn),
276     getrlibpath(), R_OK);
277     FILE * fp;
278     if (!path || !(fp = fopen(path, "r"))) {
279     sprintf(errmsg, "cannot %s modifier file '%s'",
280     path ? "open" : "find", modfn);
281     error(SYSTEM, errmsg);
282     return false;
283     }
284     char mod[MAXSTR];
285     while (fgetword(mod, sizeof(mod), fp))
286     if (!AddModifier(mod, outspec, prms, binval, bincnt))
287     return false;
288     fclose(fp);
289     return true;
290     }
291    
292     // Prepare output channels and return # completed rows
293     int
294     RcontribSimulManager::PrepOutput()
295     {
296     if (!outList || !RtraceSimulManager::Ready()) {
297     error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
298     return -1;
299     }
300     int remWarnings = 20;
301     for (RcontribOutput *op = outList; op; op = op->next) {
302     if (op->rData) {
303     error(INTERNAL, "output channel already open in PrepOutput()");
304     return -1;
305     }
306     op->nRows = yres * (xres + !xres);
307     op->rData = (*cdsF)(op->ofname, outOp,
308     GetHeadLen()+1024 + op->nRows*op->rowBytes);
309     freeqstr(op->ofname); op->ofname = NULL;
310     if (outOp == RCOrecover) {
311     int rd = op->CheckHeader(this);
312     if (rd < 0)
313     return -1;
314     if (rd >= op->nRows) {
315     if (remWarnings >= 0) {
316     sprintf(errmsg, "recovered output '%s' already done",
317     op->GetName());
318     error(WARNING, remWarnings ? errmsg : "etc...");
319     remWarnings--;
320     }
321     rd = op->nRows;
322     }
323     if (!rInPos | (rInPos > rd)) rInPos = rd;
324     } else if (!op->NewHeader(this))
325     return -1;
326     // make sure there's room
327     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
328     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
329     return -1; // calls error() for us
330     }
331     rowsDone.NewBitMap(outList->nRows); // create row completion map
332     rowsDone.ClearBits(0, rInPos, true);
333     return rInPos;
334     }
335    
336     // Create header in open write-only channel
337     bool
338     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
339     {
340     int headlim = rcp->GetHeadLen() + 1024;
341     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
342     int esiz;
343     const int etyp = rcp->GetFormat(&esiz);
344    
345     strcpy(hdr, HDRSTR);
346     strcat(hdr, "RADIANCE\n");
347     begData = strlen(hdr);
348     strcpy(hdr+begData, rcp->GetHeadStr());
349     begData += rcp->GetHeadLen();
350     if (omod) {
351     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
352     begData += strlen(hdr+begData);
353     }
354     if (obin >= 0) {
355     sprintf(hdr+begData, "BIN=%d\n", obin);
356     begData += strlen(hdr+begData);
357     }
358     strcpy(hdr+begData, ROWZEROSTR);
359     rowCountPos = begData+LNROWSTR;
360     begData += sizeof(ROWZEROSTR)-1;
361     if (!xres | (rowBytes > esiz)) {
362     sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
363     begData += strlen(hdr+begData);
364     }
365     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
366     begData += strlen(hdr+begData);
367     if (NCSAMP > 3) {
368     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
369     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
370     begData += strlen(hdr+begData);
371     }
372     if (etyp != 'c') {
373     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
374     begData += strlen(hdr+begData);
375     }
376     int align = 0;
377     switch (etyp) {
378     case 'f':
379     align = sizeof(float);
380     break;
381     case 'd':
382     align = sizeof(double);
383     break;
384     case 'c':
385     break;
386     default:
387     error(INTERNAL, "unsupported data type in NewHeader()");
388     return false;
389     }
390     strcpy(hdr+begData, FMTSTR); // write format string
391     begData += strlen(hdr+begData);
392     strcpy(hdr+begData, formstr(etyp));
393     begData += strlen(hdr+begData);
394     if (align) // align data at end of header
395     while ((begData+2) % align)
396     hdr[begData++] = ' ';
397     hdr[begData++] = '\n'; // EOL for data format
398     hdr[begData++] = '\n'; // end of nominal header
399     if ((xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
400     sprintf(hdr+begData, PIXSTDFMT, yres, xres);
401     begData += strlen(hdr+begData);
402     }
403     return rData->ReleaseMemory(hdr, RDSwrite);
404     }
405    
406     // find string argument in header for named variable
407     static const char *
408     findArgs(const char *hdr, const char *vnm, int len)
409     {
410     const char * npos = strnstr(hdr, vnm, len);
411    
412     if (!npos) return NULL;
413    
414     npos += strlen(vnm); // find start of (first) argument
415     len -= npos - hdr;
416     while (len-- > 0 && (*npos == '=') | isspace(*npos))
417     if (*npos++ == '\n')
418     return NULL;
419    
420     return (len >= 0) ? npos : NULL;
421     }
422    
423     // Load and check header in read/write channel
424     int
425     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
426     {
427     int esiz;
428     const int etyp = rcp->GetFormat(&esiz);
429     const int maxlen = rcp->GetHeadLen() + 1024;
430     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
431     const char * cp;
432     // find end of header
433     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
434     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
435     error(USER, errmsg);
436     return -1;
437     }
438     begData = cp - hdr + 1; // increment again at end
439     // check # components
440     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
441     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
442     error(USER, errmsg);
443     return -1;
444     }
445     // check format
446 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
447     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
448 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
449     error(USER, errmsg);
450     return -1;
451     }
452     // check #columns
453     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
454     !xres | (rowBytes > esiz)) {
455     sprintf(errmsg, "expected NCOLS=%d in '%s'",
456     int(rowBytes/esiz), GetName());
457     error(USER, errmsg);
458     return -1;
459     }
460     // find row count
461     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
462     sprintf(errmsg, "missing NROWS in '%s'", GetName());
463     error(USER, errmsg);
464     return -1;
465     }
466     rowCountPos = cp - hdr;
467     int rlast = atoi(cp);
468     begData++; // advance past closing EOL
469     if ((xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
470     char rbuf[64];
471     sprintf(rbuf, PIXSTDFMT, yres, xres);
472     int rlen = strlen(rbuf);
473     if (strncmp(rbuf, hdr+begData, rlen)) {
474     sprintf(errmsg, "bad resolution string in '%s'", GetName());
475     error(USER, errmsg);
476     return -1;
477     }
478     begData += rlen;
479     }
480     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
481     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
482     }
483    
484     // Rewind calculation (previous results unchanged)
485     bool
486     RcontribSimulManager::ResetRow(int r)
487     {
488     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
489     error(WARNING, "ignoring bad call to ResetRow()");
490     return (r == rInPos);
491     }
492     FlushQueue(); // finish current and reset
493     for (RcontribOutput *op = outList; op; op = op->next)
494     if (!op->SetRowsDone(r))
495     return false;
496    
497     rowsDone.ClearBits(r, rInPos-r, false);
498     rInPos = r;
499     return true;
500     }
501    
502     // call-back for averaging each modifier's contributions to assigned channel(s)
503     static int
504     putModContrib(const LUENT *lp, void *p)
505     {
506     RcontribMod * mp = (RcontribMod *)lp->data;
507     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
508     const double sca = 1./rcp->accum;
509     const DCOLORV * dvp = mp->cbin;
510     RcontribOutput * op = mp->opl;
511     int i, n;
512    
513     switch (rcp->GetFormat()) { // conversion based on output type
514     case 'd': {
515     double * dvo = (double *)op->InsertionP(mp->coffset);
516     for (n = mp->nbins; n--; ) {
517     for (i = 0; i < NCSAMP; i++)
518     *dvo++ = *dvp++ * sca;
519     if ((op->obin >= 0) & (n > 0))
520     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
521     }
522     } break;
523     case 'f': {
524     float * fvo = (float *)op->InsertionP(mp->coffset);
525     for (n = mp->nbins; n--; ) {
526     for (i = 0; i < NCSAMP; i++)
527     *fvo++ = float(*dvp++ * sca);
528     if ((op->obin >= 0) & (n > 0))
529     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
530     }
531     } break;
532     case 'c': {
533     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
534     for (n = mp->nbins; n--; ) {
535     SCOLOR scol;
536     for (i = 0; i < NCSAMP; i++)
537     scol[i] = COLORV(*dvp++ * sca);
538     scolor_scolr(cvo, scol);
539     cvo += LSCOLR;
540     if ((op->obin >= 0) & (n > 0))
541     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
542     }
543     } break;
544     default:
545     error(CONSISTENCY, "unsupported output type in sendModContrib()");
546     return -1;
547     }
548     // clear for next tally
549     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
550     return 1;
551     }
552    
553     // Add a ray/bundle to compute next record
554     int
555     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
556     {
557     if (!Ready())
558     return 0;
559     if (rInPos >= outList->nRows) {
560     error(WARNING, "ComputeRecord() called after last record");
561     return 0;
562     }
563     if (nkids > 0) { // in parent process?
564     int k = GetChild(); // updates output rows
565     if (k < 0) return -1; // can't really happen
566     RowAssignment rass;
567     rass.row = kidRow[k] = rInPos++;
568     rass.ac = accum;
569     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
570     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
571     != sizeof(FVECT)*2*accum) {
572     error(SYSTEM, "cannot write to child; dead process?");
573     return -1;
574     }
575     return accum; // tracing/output happens in child
576     }
577     if (NCSAMP != nChan) {
578     error(INTERNAL, "number of color channels must remain fixed");
579     return -1;
580     }
581     // actual work is done here...
582     if (EnqueueBundle(orig_direc, accum) < 0)
583     return -1;
584    
585     RcontribOutput * op; // prepare output buffers
586     for (op = outList; op; op = op->next)
587     if (!op->GetRow(rInPos))
588     return -1;
589     // convert averages & clear
590     if (lu_doall(&modLUT, putModContrib, this) < 0)
591     return -1;
592     // write buffers
593     for (op = outList; op; op = op->next)
594     op->DoneRow();
595    
596     if (!nkids) // update row if solo process
597     UpdateRowsDone(rInPos++); // XXX increment here is critical
598    
599     return accum;
600     }
601    
602     // Get next available child, returning index or -1 if forceWait & idling
603     int
604     RcontribSimulManager::GetChild(bool forceWait)
605     {
606     if (nkids <= 0)
607     return -1;
608     // take inventory
609     int pn, n = 0;
610     fd_set writeset, errset;
611     FD_ZERO(&writeset); FD_ZERO(&errset);
612     for (pn = nkids; pn--; ) {
613     if (kidRow[pn] < 0) { // child already ready?
614     if (forceWait) continue;
615     return pn; // good enough
616     }
617     FD_SET(kid[pn].w, &writeset); // will check on this one
618     FD_SET(kid[pn].w, &errset);
619     if (kid[pn].w >= n)
620     n = kid[pn].w + 1;
621     }
622     if (!n) // every child is idle?
623     return -1;
624     // wait on "busy" child(ren)
625     while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
626     if (errno != EINTR) {
627     error(SYSTEM, "select call failed in GetChild()");
628     return -1;
629     }
630     pn = -1; // get flags set by select
631     for (n = nkids; n--; )
632     if (kidRow[n] >= 0 &&
633     FD_ISSET(kid[n].w, &writeset) |
634     FD_ISSET(kid[n].w, &errset)) {
635     // update output row counts
636 greg 2.3 if (!FD_ISSET(kid[n].w, &errset))
637     UpdateRowsDone(kidRow[n]);
638 greg 2.1 kidRow[n] = -1; // flag it available
639     pn = n;
640     }
641     return pn;
642     }
643    
644     // Update row completion bitmap and update outputs (does not change rInPos)
645     bool
646     RcontribSimulManager::UpdateRowsDone(int r)
647     {
648     if (!rowsDone.TestAndSet(r)) {
649     error(WARNING, "redundant call to UpdateRowsDone()");
650     return false;
651     }
652     int nDone = GetRowFinished();
653     if (nDone <= r)
654     return true; // nothing to update, yet
655     for (RcontribOutput *op = outList; op; op = op->next)
656     if (!op->SetRowsDone(nDone))
657     return false;
658     return true; // up-to-date
659     }
660    
661     // Run rcontrib child process (never returns)
662     void
663     RcontribSimulManager::RunChild()
664     {
665     FVECT * vecList = NULL;
666     RowAssignment rass;
667     ssize_t nr;
668    
669     accum = 0;
670     errno = 0;
671     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
672     if (!rass.ac) {
673     error(CONSISTENCY, "bad accumulator count in child");
674     exit(1);
675     }
676     if (rass.ac > accum)
677     vecList = (FVECT *)erealloc(vecList,
678     sizeof(FVECT)*2*rass.ac);
679     accum = rass.ac;
680     rInPos = rass.row;
681    
682     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
683     sizeof(FVECT)*2*accum)
684     break;
685    
686     if (ComputeRecord(vecList) <= 0)
687     exit(1);
688     }
689     if (nr) {
690     error(SYSTEM, "read error in child process");
691     exit(1);
692     }
693     exit(0);
694     }
695    
696     // Add child processes as indicated
697     bool
698     RcontribSimulManager::StartKids(int n2go)
699     {
700     if ((n2go <= 0) | (nkids + n2go <= 1))
701     return false;
702    
703     if (!nkids)
704     cow_memshare(); // preload objects
705    
706     n2go += nkids; // => desired brood size
707     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
708     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
709    
710     fflush(stdout); // shouldn't use, anyway
711     while (nkids < n2go) {
712     kid[nkids] = sp_inactive;
713     kid[nkids].w = dup(1);
714     kid[nkids].flags |= PF_FILT_OUT;
715     int rv = open_process(&kid[nkids], NULL);
716     if (!rv) { // in child process?
717     while (nkids-- > 0)
718     close(kid[nkids].w);
719     free(kid); free(kidRow);
720     kid = NULL; kidRow = NULL;
721     RunChild(); // should never return
722     _exit(1);
723     }
724     if (rv < 0) {
725     error(SYSTEM, "cannot fork worker process");
726     return false;
727     }
728     kidRow[nkids++] = -1; // newborn is ready
729     }
730     return true;
731     }
732    
733     // Reap the indicated number of children (all if 0)
734     int
735     RcontribSimulManager::StopKids(int n2end)
736     {
737     if (nkids <= 0)
738     return 0;
739     FlushQueue();
740     int status = 0;
741     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
742     status = close_processes(kid, nkids);
743     free(kid); free(kidRow);
744     kid = NULL; kidRow = NULL;
745     nkids = 0;
746     } else { // else shrink family
747     int st;
748     while (n2end-- > 0)
749     if ((st = close_process(&kid[--nkids])) > 0)
750     status = st;
751     // could call realloc(), but it hardly matters
752     }
753     if (status) {
754     sprintf(errmsg, "non-zero (%d) status from child", status);
755     error(WARNING, errmsg);
756     }
757     return status;
758     }
759    
760     // Set number of computation threads (0 => #cores)
761     int
762     RcontribSimulManager::SetThreadCount(int nt)
763     {
764     if (!Ready()) {
765     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
766     return 0;
767     }
768     if (nt < 0)
769     return nkids;
770     if (!nt) nt = GetNCores();
771     int status = 0;
772     if (nt == 1)
773     status = StopKids();
774     else if (nt < nkids)
775     status = StopKids(nkids-nt);
776     else if (nt > nkids)
777     StartKids(nt-nkids);
778     if (status) {
779     sprintf(errmsg, "non-zero (%d) status from child", status);
780     error(WARNING, errmsg);
781     }
782     return nkids;
783     }