ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.1
Committed: Tue Oct 29 00:36:54 2024 UTC (6 months ago) by greg
Branch: MAIN
Log Message:
feat(rxcontrib): First compiled version of rxcontrib tool to test new C++ classes

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2     static const char RCSid[] = "$Id$";
3     #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27     extern int contrib; /* computing contributions? */
28     extern int lim_dist; /* limit distance? */
29    
30     // new/exclusive, overwrite if exists, or recover data
31     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
32     RDSread|RDSwrite};
33    
34     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
35     #define LNROWSTR 6
36    
37     // Modifier channel for recording contributions (no constructor/destructor)
38     struct RcontribMod {
39     RcontribOutput * opl; // pointer to first output channel
40     char * params; // parameters string
41     EPNODE * binv; // bin expression
42     int nbins; // bin count this modifier
43     int coffset; // column offset in bytes
44     DCOLORV cbin[1]; // bin accumulator (extends struct)
45     /// Access specific (spectral) color bin
46     DCOLORV * operator[](int n) {
47     if ((n < 0) | (n >= nbins)) return NULL;
48     return cbin + n*NCSAMP;
49     }
50     };
51    
52     // used to assign record calc to child
53     struct RowAssignment {
54     uint32 row; // row to do
55     uint32 ac; // accumulation count
56     };
57    
58     // Our default data share function
59     RdataShare *
60     defDataShare(const char *name, RCOutputOp op, size_t siz)
61     {
62     return new RdataShareMap(name, RSDOflags[op], siz);
63     }
64    
65     // Allocate rcontrib accumulator
66     RcontribMod *
67     NewRcMod(const char *prms, const char *binexpr, int ncbins)
68     {
69     if (ncbins <= 0) return NULL;
70     if (!prms) prms = "";
71     if (!binexpr | (ncbins == 1))
72     binexpr = "0";
73    
74     RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
75     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
76     strlen(prms)+1);
77    
78     mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
79     mp->binv = eparse(const_cast<char *>(binexpr));
80     mp->nbins = ncbins;
81     return mp;
82     }
83    
84     // Free an RcontribMod
85     void
86     FreeRcMod(void *p)
87     {
88     if (!p) return;
89     epfree((*(RcontribMod *)p).binv, true);
90     efree(p);
91     }
92    
93     // Set output format ('f', 'd', or 'c')
94     bool
95     RcontribSimulManager::SetDataFormat(int ty)
96     {
97     if (outList) {
98     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
99     return false;
100     }
101     switch (ty) {
102     case 'f':
103     dsiz = sizeof(float)*NCSAMP;
104     break;
105     case 'd':
106     dsiz = sizeof(double)*NCSAMP;
107     break;
108     case 'c':
109     dsiz = LSCOLR;
110     break;
111     default:
112     sprintf(errmsg, "unsupported output format '%c'", ty);
113     error(INTERNAL, errmsg);
114     return false;
115     }
116     dtyp = ty;
117     return true;
118     }
119    
120     // static call-back for rcontrib ray-tracing
121     int
122     RcontribSimulManager::RctCall(RAY *r, void *cd)
123     {
124     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
125     return 0;
126     // shadow ray not on source?
127     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
128     return 0;
129    
130     const char * mname = objptr(r->ro->omod)->oname;
131     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
132     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
133     if (!mp)
134     return 0; // not in our modifier list
135    
136     worldfunc(RCCONTEXT, r); // compute bin #
137     set_eparams(mp->params);
138     double bval = evalue(mp->binv);
139     if (bval <= -.5)
140     return 0; // silently ignore negative bin index
141     DCOLORV * dvp = (*mp)[int(bval + .5)];
142     if (!dvp) {
143     sprintf(errmsg, "bad bin number for '%s' (%.1f ignored)", mname, bval);
144     error(WARNING, errmsg);
145     return 0;
146     }
147     SCOLOR contr;
148     raycontrib(contr, r, PRIMARY); // compute coefficient
149     if (contrib)
150     smultscolor(contr, r->rcol); // -> value contribution
151     for (int i = 0; i < NCSAMP; i++)
152     *dvp++ += contr[i]; // accumulate color/spectrum
153     return 1;
154     }
155    
156     // check for given format type in string, return last char position
157     static int
158     hasFormat(const char *tst, const char *match)
159     {
160     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
161     const char * s = tst;
162     while (*s) {
163     if (*s++ != '%')
164     continue;
165     while (!strchr(allPoss, *s))
166     s++;
167     if (strchr(match, *s))
168     break;
169     s++;
170     }
171     return (*s != '\0')*(s - tst);
172     }
173    
174     // Add a modifier and arguments, opening output file(s)
175     bool
176     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
177     const char *prms, const char *binval, int bincnt)
178     {
179     if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
180     error(WARNING, "ignoring bad call to AddModifier()");
181     return false;
182     }
183     if (!nChan) { // initial call?
184     if (!SetDataFormat(dtyp))
185     return false;
186     nChan = NCSAMP;
187     } else if (nChan != NCSAMP) {
188     error(INTERNAL, "number of spectral channels must be fixed");
189     return false;
190     }
191     if (Ready()) {
192     error(INTERNAL, "call to AddModifier() after PrepOutput()");
193     return false;
194     }
195     LUENT * lp = lu_find(&modLUT, modn);
196     if (lp->data) {
197     sprintf(errmsg, "duplicate modifier '%s'", modn);
198     error(USER, errmsg);
199     return false;
200     }
201     if (!lp->key) // create new entry
202     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
203    
204     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
205     if (!mp) return false;
206     lp->data = (char *)mp;
207     const int modndx = hasFormat(outspec, "sb");
208     const int binndx = hasFormat(outspec, "diouxX");
209     int bin0 = 0;
210     char fnbuf[512];
211     if (!modndx | (modndx > binndx))
212     sprintf(fnbuf, outspec, bin0, modn);
213     else
214     sprintf(fnbuf, outspec, modn, bin0);
215     RcontribOutput * olast = NULL;
216     RcontribOutput * op;
217     for (op = outList; op; op = (olast=op)->next) {
218     if (strcmp(op->GetName(), fnbuf))
219     continue;
220     if (modndx) { // this ain't right
221     sprintf(errmsg, "output name collision for '%s'", fnbuf);
222     error(USER, errmsg);
223     return false;
224     }
225     if ((binndx > 0) & (xres > 0)) {
226     sprintf(fnbuf, outspec, ++bin0);
227     continue; // each bin goes to another image
228     }
229     mp->coffset = op->rowBytes; // else add to what's there
230     break;
231     }
232     if (!op) { // create new output channel?
233     op = new RcontribOutput(fnbuf);
234     if (olast) olast->next = op;
235     else outList = op;
236     }
237     mp->opl = op; // first (maybe only) output channel
238     if (modndx) // remember modifier if part of name
239     op->omod = lp->key;
240     if (binndx > 0) { // append output image/bin list
241     op->rowBytes += dsiz;
242     op->obin = bin0;
243     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
244     if (!modndx | (modndx > binndx))
245     sprintf(fnbuf, outspec, bin0+bincnt, modn);
246     else
247     sprintf(fnbuf, outspec, modn, bin0+bincnt);
248     if (!op->next) {
249     olast = op;
250     olast->next = op = new RcontribOutput(fnbuf);
251     if (modndx) op->omod = lp->key;
252     op->obin = bin0+bincnt;
253     } else {
254     op = op->next;
255     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
256     "bin number mismatch in AddModifier()");
257     }
258     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
259     "row offset mismatch in AddModifier()");
260     op->rowBytes += dsiz;
261     }
262     } else // else send all results to this channel
263     op->rowBytes += bincnt*dsiz;
264     return true;
265     }
266    
267     // Add a file of modifiers with associated arguments
268     bool
269     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
270     const char *prms,
271     const char *binval, int bincnt)
272     {
273     char * path = getpath(const_cast<char *>(modfn),
274     getrlibpath(), R_OK);
275     FILE * fp;
276     if (!path || !(fp = fopen(path, "r"))) {
277     sprintf(errmsg, "cannot %s modifier file '%s'",
278     path ? "open" : "find", modfn);
279     error(SYSTEM, errmsg);
280     return false;
281     }
282     char mod[MAXSTR];
283     while (fgetword(mod, sizeof(mod), fp))
284     if (!AddModifier(mod, outspec, prms, binval, bincnt))
285     return false;
286     fclose(fp);
287     return true;
288     }
289    
290     // Run through current list of output struct's
291     int
292     RcontribSimulManager::GetOutputs(RoutputShareF *osF, void *cd) const
293     {
294     int cnt = 0;
295    
296     for (const RcontribOutput *op = outList; op; op = op->next) {
297     int rv = 1;
298     if (osF && (rv = (*osF)(op, cd)) < 0)
299     return rv;
300     cnt += rv;
301     }
302     return cnt;
303     }
304    
305     // Prepare output channels and return # completed rows
306     int
307     RcontribSimulManager::PrepOutput()
308     {
309     if (!outList || !RtraceSimulManager::Ready()) {
310     error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
311     return -1;
312     }
313     int remWarnings = 20;
314     for (RcontribOutput *op = outList; op; op = op->next) {
315     if (op->rData) {
316     error(INTERNAL, "output channel already open in PrepOutput()");
317     return -1;
318     }
319     op->nRows = yres * (xres + !xres);
320     op->rData = (*cdsF)(op->ofname, outOp,
321     GetHeadLen()+1024 + op->nRows*op->rowBytes);
322     freeqstr(op->ofname); op->ofname = NULL;
323     if (outOp == RCOrecover) {
324     int rd = op->CheckHeader(this);
325     if (rd < 0)
326     return -1;
327     if (rd >= op->nRows) {
328     if (remWarnings >= 0) {
329     sprintf(errmsg, "recovered output '%s' already done",
330     op->GetName());
331     error(WARNING, remWarnings ? errmsg : "etc...");
332     remWarnings--;
333     }
334     rd = op->nRows;
335     }
336     if (!rInPos | (rInPos > rd)) rInPos = rd;
337     } else if (!op->NewHeader(this))
338     return -1;
339     // make sure there's room
340     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
341     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
342     return -1; // calls error() for us
343     }
344     if (lim_dist) // XXX where else to put this?
345     rtFlags |= RTlimDist;
346     else
347     rtFlags &= ~RTlimDist;
348    
349     rowsDone.NewBitMap(outList->nRows); // create row completion map
350     rowsDone.ClearBits(0, rInPos, true);
351     return rInPos;
352     }
353    
354     // Create header in open write-only channel
355     bool
356     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
357     {
358     int headlim = rcp->GetHeadLen() + 1024;
359     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
360     int esiz;
361     const int etyp = rcp->GetFormat(&esiz);
362    
363     strcpy(hdr, HDRSTR);
364     strcat(hdr, "RADIANCE\n");
365     begData = strlen(hdr);
366     strcpy(hdr+begData, rcp->GetHeadStr());
367     begData += rcp->GetHeadLen();
368     if (omod) {
369     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
370     begData += strlen(hdr+begData);
371     }
372     if (obin >= 0) {
373     sprintf(hdr+begData, "BIN=%d\n", obin);
374     begData += strlen(hdr+begData);
375     }
376     strcpy(hdr+begData, ROWZEROSTR);
377     rowCountPos = begData+LNROWSTR;
378     begData += sizeof(ROWZEROSTR)-1;
379     if (!xres | (rowBytes > esiz)) {
380     sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
381     begData += strlen(hdr+begData);
382     }
383     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
384     begData += strlen(hdr+begData);
385     if (NCSAMP > 3) {
386     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
387     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
388     begData += strlen(hdr+begData);
389     }
390     if (etyp != 'c') {
391     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
392     begData += strlen(hdr+begData);
393     }
394     int align = 0;
395     switch (etyp) {
396     case 'f':
397     align = sizeof(float);
398     break;
399     case 'd':
400     align = sizeof(double);
401     break;
402     case 'c':
403     break;
404     default:
405     error(INTERNAL, "unsupported data type in NewHeader()");
406     return false;
407     }
408     strcpy(hdr+begData, FMTSTR); // write format string
409     begData += strlen(hdr+begData);
410     strcpy(hdr+begData, formstr(etyp));
411     begData += strlen(hdr+begData);
412     if (align) // align data at end of header
413     while ((begData+2) % align)
414     hdr[begData++] = ' ';
415     hdr[begData++] = '\n'; // EOL for data format
416     hdr[begData++] = '\n'; // end of nominal header
417     if ((xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
418     sprintf(hdr+begData, PIXSTDFMT, yres, xres);
419     begData += strlen(hdr+begData);
420     }
421     return rData->ReleaseMemory(hdr, RDSwrite);
422     }
423    
424     // find string argument in header for named variable
425     static const char *
426     findArgs(const char *hdr, const char *vnm, int len)
427     {
428     const char * npos = strnstr(hdr, vnm, len);
429    
430     if (!npos) return NULL;
431    
432     npos += strlen(vnm); // find start of (first) argument
433     len -= npos - hdr;
434     while (len-- > 0 && (*npos == '=') | isspace(*npos))
435     if (*npos++ == '\n')
436     return NULL;
437    
438     return (len >= 0) ? npos : NULL;
439     }
440    
441     // Load and check header in read/write channel
442     int
443     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
444     {
445     int esiz;
446     const int etyp = rcp->GetFormat(&esiz);
447     const int maxlen = rcp->GetHeadLen() + 1024;
448     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
449     const char * cp;
450     // find end of header
451     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
452     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
453     error(USER, errmsg);
454     return -1;
455     }
456     begData = cp - hdr + 1; // increment again at end
457     // check # components
458     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
459     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
460     error(USER, errmsg);
461     return -1;
462     }
463     // check format
464     if (!(cp = findArgs(hdr, FMTSTR, begData)) || strcmp(cp, formstr(etyp))) {
465     sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
466     error(USER, errmsg);
467     return -1;
468     }
469     // check #columns
470     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
471     !xres | (rowBytes > esiz)) {
472     sprintf(errmsg, "expected NCOLS=%d in '%s'",
473     int(rowBytes/esiz), GetName());
474     error(USER, errmsg);
475     return -1;
476     }
477     // find row count
478     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
479     sprintf(errmsg, "missing NROWS in '%s'", GetName());
480     error(USER, errmsg);
481     return -1;
482     }
483     rowCountPos = cp - hdr;
484     int rlast = atoi(cp);
485     begData++; // advance past closing EOL
486     if ((xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
487     char rbuf[64];
488     sprintf(rbuf, PIXSTDFMT, yres, xres);
489     int rlen = strlen(rbuf);
490     if (strncmp(rbuf, hdr+begData, rlen)) {
491     sprintf(errmsg, "bad resolution string in '%s'", GetName());
492     error(USER, errmsg);
493     return -1;
494     }
495     begData += rlen;
496     }
497     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
498     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
499     }
500    
501     // Rewind calculation (previous results unchanged)
502     bool
503     RcontribSimulManager::ResetRow(int r)
504     {
505     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
506     error(WARNING, "ignoring bad call to ResetRow()");
507     return (r == rInPos);
508     }
509     FlushQueue(); // finish current and reset
510     for (RcontribOutput *op = outList; op; op = op->next)
511     if (!op->SetRowsDone(r))
512     return false;
513    
514     rowsDone.ClearBits(r, rInPos-r, false);
515     rInPos = r;
516     return true;
517     }
518    
519     // call-back for averaging each modifier's contributions to assigned channel(s)
520     static int
521     putModContrib(const LUENT *lp, void *p)
522     {
523     RcontribMod * mp = (RcontribMod *)lp->data;
524     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
525     const double sca = 1./rcp->accum;
526     const DCOLORV * dvp = mp->cbin;
527     RcontribOutput * op = mp->opl;
528     int i, n;
529    
530     switch (rcp->GetFormat()) { // conversion based on output type
531     case 'd': {
532     double * dvo = (double *)op->InsertionP(mp->coffset);
533     for (n = mp->nbins; n--; ) {
534     for (i = 0; i < NCSAMP; i++)
535     *dvo++ = *dvp++ * sca;
536     if ((op->obin >= 0) & (n > 0))
537     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
538     }
539     } break;
540     case 'f': {
541     float * fvo = (float *)op->InsertionP(mp->coffset);
542     for (n = mp->nbins; n--; ) {
543     for (i = 0; i < NCSAMP; i++)
544     *fvo++ = float(*dvp++ * sca);
545     if ((op->obin >= 0) & (n > 0))
546     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
547     }
548     } break;
549     case 'c': {
550     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
551     for (n = mp->nbins; n--; ) {
552     SCOLOR scol;
553     for (i = 0; i < NCSAMP; i++)
554     scol[i] = COLORV(*dvp++ * sca);
555     scolor_scolr(cvo, scol);
556     cvo += LSCOLR;
557     if ((op->obin >= 0) & (n > 0))
558     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
559     }
560     } break;
561     default:
562     error(CONSISTENCY, "unsupported output type in sendModContrib()");
563     return -1;
564     }
565     // clear for next tally
566     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
567     return 1;
568     }
569    
570     // Add a ray/bundle to compute next record
571     int
572     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
573     {
574     if (!Ready())
575     return 0;
576     if (rInPos >= outList->nRows) {
577     error(WARNING, "ComputeRecord() called after last record");
578     return 0;
579     }
580     if (nkids > 0) { // in parent process?
581     int k = GetChild(); // updates output rows
582     if (k < 0) return -1; // can't really happen
583     RowAssignment rass;
584     rass.row = kidRow[k] = rInPos++;
585     rass.ac = accum;
586     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
587     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
588     != sizeof(FVECT)*2*accum) {
589     error(SYSTEM, "cannot write to child; dead process?");
590     return -1;
591     }
592     return accum; // tracing/output happens in child
593     }
594     if (NCSAMP != nChan) {
595     error(INTERNAL, "number of color channels must remain fixed");
596     return -1;
597     }
598     // actual work is done here...
599     if (EnqueueBundle(orig_direc, accum) < 0)
600     return -1;
601    
602     RcontribOutput * op; // prepare output buffers
603     for (op = outList; op; op = op->next)
604     if (!op->GetRow(rInPos))
605     return -1;
606     // convert averages & clear
607     if (lu_doall(&modLUT, putModContrib, this) < 0)
608     return -1;
609     // write buffers
610     for (op = outList; op; op = op->next)
611     op->DoneRow();
612    
613     if (!nkids) // update row if solo process
614     UpdateRowsDone(rInPos++); // XXX increment here is critical
615    
616     return accum;
617     }
618    
619     // Get next available child, returning index or -1 if forceWait & idling
620     int
621     RcontribSimulManager::GetChild(bool forceWait)
622     {
623     if (nkids <= 0)
624     return -1;
625     // take inventory
626     int pn, n = 0;
627     fd_set writeset, errset;
628     FD_ZERO(&writeset); FD_ZERO(&errset);
629     for (pn = nkids; pn--; ) {
630     if (kidRow[pn] < 0) { // child already ready?
631     if (forceWait) continue;
632     return pn; // good enough
633     }
634     FD_SET(kid[pn].w, &writeset); // will check on this one
635     FD_SET(kid[pn].w, &errset);
636     if (kid[pn].w >= n)
637     n = kid[pn].w + 1;
638     }
639     if (!n) // every child is idle?
640     return -1;
641     // wait on "busy" child(ren)
642     while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
643     if (errno != EINTR) {
644     error(SYSTEM, "select call failed in GetChild()");
645     return -1;
646     }
647     pn = -1; // get flags set by select
648     for (n = nkids; n--; )
649     if (kidRow[n] >= 0 &&
650     FD_ISSET(kid[n].w, &writeset) |
651     FD_ISSET(kid[n].w, &errset)) {
652     // update output row counts
653     UpdateRowsDone(kidRow[n]);
654     kidRow[n] = -1; // flag it available
655     pn = n;
656     }
657     return pn;
658     }
659    
660     // Update row completion bitmap and update outputs (does not change rInPos)
661     bool
662     RcontribSimulManager::UpdateRowsDone(int r)
663     {
664     if (!rowsDone.TestAndSet(r)) {
665     error(WARNING, "redundant call to UpdateRowsDone()");
666     return false;
667     }
668     int nDone = GetRowFinished();
669     if (nDone <= r)
670     return true; // nothing to update, yet
671     for (RcontribOutput *op = outList; op; op = op->next)
672     if (!op->SetRowsDone(nDone))
673     return false;
674     return true; // up-to-date
675     }
676    
677     // Run rcontrib child process (never returns)
678     void
679     RcontribSimulManager::RunChild()
680     {
681     FVECT * vecList = NULL;
682     RowAssignment rass;
683     ssize_t nr;
684    
685     accum = 0;
686     errno = 0;
687     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
688     if (!rass.ac) {
689     error(CONSISTENCY, "bad accumulator count in child");
690     exit(1);
691     }
692     if (rass.ac > accum)
693     vecList = (FVECT *)erealloc(vecList,
694     sizeof(FVECT)*2*rass.ac);
695     accum = rass.ac;
696     rInPos = rass.row;
697    
698     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
699     sizeof(FVECT)*2*accum)
700     break;
701    
702     if (ComputeRecord(vecList) <= 0)
703     exit(1);
704     }
705     if (nr) {
706     error(SYSTEM, "read error in child process");
707     exit(1);
708     }
709     exit(0);
710     }
711    
712     // Add child processes as indicated
713     bool
714     RcontribSimulManager::StartKids(int n2go)
715     {
716     if ((n2go <= 0) | (nkids + n2go <= 1))
717     return false;
718    
719     if (!nkids)
720     cow_memshare(); // preload objects
721    
722     n2go += nkids; // => desired brood size
723     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
724     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
725    
726     fflush(stdout); // shouldn't use, anyway
727     while (nkids < n2go) {
728     kid[nkids] = sp_inactive;
729     kid[nkids].w = dup(1);
730     kid[nkids].flags |= PF_FILT_OUT;
731     int rv = open_process(&kid[nkids], NULL);
732     if (!rv) { // in child process?
733     while (nkids-- > 0)
734     close(kid[nkids].w);
735     free(kid); free(kidRow);
736     kid = NULL; kidRow = NULL;
737     rowsDone.NewBitMap(0);
738     RunChild(); // should never return
739     _exit(1);
740     }
741     if (rv < 0) {
742     error(SYSTEM, "cannot fork worker process");
743     return false;
744     }
745     kidRow[nkids++] = -1; // newborn is ready
746     }
747     return true;
748     }
749    
750     // Reap the indicated number of children (all if 0)
751     int
752     RcontribSimulManager::StopKids(int n2end)
753     {
754     if (nkids <= 0)
755     return 0;
756     FlushQueue();
757     int status = 0;
758     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
759     status = close_processes(kid, nkids);
760     free(kid); free(kidRow);
761     kid = NULL; kidRow = NULL;
762     nkids = 0;
763     } else { // else shrink family
764     int st;
765     while (n2end-- > 0)
766     if ((st = close_process(&kid[--nkids])) > 0)
767     status = st;
768     // could call realloc(), but it hardly matters
769     }
770     if (status) {
771     sprintf(errmsg, "non-zero (%d) status from child", status);
772     error(WARNING, errmsg);
773     }
774     return status;
775     }
776    
777     // Set number of computation threads (0 => #cores)
778     int
779     RcontribSimulManager::SetThreadCount(int nt)
780     {
781     if (!Ready()) {
782     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
783     return 0;
784     }
785     if (nt < 0)
786     return nkids;
787     if (!nt) nt = GetNCores();
788     int status = 0;
789     if (nt == 1)
790     status = StopKids();
791     else if (nt < nkids)
792     status = StopKids(nkids-nt);
793     else if (nt > nkids)
794     StartKids(nt-nkids);
795     if (status) {
796     sprintf(errmsg, "non-zero (%d) status from child", status);
797     error(WARNING, errmsg);
798     }
799     return nkids;
800     }