ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.5
Committed: Fri Nov 1 23:05:01 2024 UTC (5 months, 4 weeks ago) by greg
Branch: MAIN
Changes since 2.4: +6 -2 lines
Log Message:
feat(rxcontrib): Added a couple of minor sanity checks

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.5 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.4 2024/11/01 16:17:33 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27 greg 2.2 int contrib = 0; // computing contributions?
28    
29     int xres = 0; // horizontal (scan) size
30     int yres = 0; // vertical resolution
31 greg 2.1
32     // new/exclusive, overwrite if exists, or recover data
33     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
34     RDSread|RDSwrite};
35    
36     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
37     #define LNROWSTR 6
38    
39     // Modifier channel for recording contributions (no constructor/destructor)
40     struct RcontribMod {
41     RcontribOutput * opl; // pointer to first output channel
42     char * params; // parameters string
43 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
44 greg 2.1 int nbins; // bin count this modifier
45     int coffset; // column offset in bytes
46     DCOLORV cbin[1]; // bin accumulator (extends struct)
47     /// Access specific (spectral) color bin
48     DCOLORV * operator[](int n) {
49     if ((n < 0) | (n >= nbins)) return NULL;
50     return cbin + n*NCSAMP;
51     }
52     };
53    
54 greg 2.2 // Struct used to assign record calculation to child
55 greg 2.1 struct RowAssignment {
56     uint32 row; // row to do
57     uint32 ac; // accumulation count
58     };
59    
60     // Our default data share function
61     RdataShare *
62     defDataShare(const char *name, RCOutputOp op, size_t siz)
63     {
64     return new RdataShareMap(name, RSDOflags[op], siz);
65     }
66    
67     // Allocate rcontrib accumulator
68     RcontribMod *
69     NewRcMod(const char *prms, const char *binexpr, int ncbins)
70     {
71     if (ncbins <= 0) return NULL;
72     if (!prms) prms = "";
73 greg 2.4 if (!binexpr & (ncbins > 1)) {
74     error(INTERNAL, "missing bin expression");
75     return NULL;
76     }
77     if (ncbins == 1) { // shouldn't have bin expression?
78     if (binexpr && strcmp(binexpr, "0"))
79     error(WARNING, "ignoring non-zero expression for single bin");
80     prms = "";
81     binexpr = NULL;
82     }
83 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
84     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
85     strlen(prms)+1);
86    
87     mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
88 greg 2.4 if (binexpr) {
89     mp->binv = eparse(const_cast<char *>(binexpr));
90     CHECK(mp->binv->type==NUM, WARNING, "constant bin expression");
91     }
92 greg 2.1 mp->nbins = ncbins;
93     return mp;
94     }
95    
96     // Free an RcontribMod
97     void
98     FreeRcMod(void *p)
99     {
100     if (!p) return;
101 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
102     if (bep) epfree(bep, true);
103 greg 2.1 efree(p);
104     }
105    
106     // Set output format ('f', 'd', or 'c')
107     bool
108     RcontribSimulManager::SetDataFormat(int ty)
109     {
110     if (outList) {
111     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
112     return false;
113     }
114     switch (ty) {
115     case 'f':
116     dsiz = sizeof(float)*NCSAMP;
117     break;
118     case 'd':
119     dsiz = sizeof(double)*NCSAMP;
120     break;
121     case 'c':
122     dsiz = LSCOLR;
123     break;
124     default:
125     sprintf(errmsg, "unsupported output format '%c'", ty);
126     error(INTERNAL, errmsg);
127     return false;
128     }
129     dtyp = ty;
130     return true;
131     }
132    
133     // static call-back for rcontrib ray-tracing
134     int
135     RcontribSimulManager::RctCall(RAY *r, void *cd)
136     {
137     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
138     return 0;
139     // shadow ray not on source?
140     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
141     return 0;
142    
143     const char * mname = objptr(r->ro->omod)->oname;
144     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
145     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
146     if (!mp)
147     return 0; // not in our modifier list
148    
149 greg 2.4 int bi = 0; // get bin index
150     if (mp->binv) {
151 greg 2.5 worldfunc(RCCONTEXT, r);
152 greg 2.4 set_eparams(mp->params);
153     double bval = evalue(mp->binv);
154     if (bval <= -.5)
155     return 0; // silently ignore negative bin index
156     bi = int(bval + .5);
157     }
158     DCOLORV * dvp = (*mp)[bi];
159 greg 2.1 if (!dvp) {
160 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
161 greg 2.1 error(WARNING, errmsg);
162     return 0;
163     }
164     SCOLOR contr;
165     raycontrib(contr, r, PRIMARY); // compute coefficient
166     if (contrib)
167     smultscolor(contr, r->rcol); // -> value contribution
168     for (int i = 0; i < NCSAMP; i++)
169     *dvp++ += contr[i]; // accumulate color/spectrum
170     return 1;
171     }
172    
173     // check for given format type in string, return last char position
174     static int
175     hasFormat(const char *tst, const char *match)
176     {
177     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
178     const char * s = tst;
179     while (*s) {
180     if (*s++ != '%')
181     continue;
182     while (!strchr(allPoss, *s))
183     s++;
184     if (strchr(match, *s))
185     break;
186     s++;
187     }
188     return (*s != '\0')*(s - tst);
189     }
190    
191     // Add a modifier and arguments, opening output file(s)
192     bool
193     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
194     const char *prms, const char *binval, int bincnt)
195     {
196     if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
197     error(WARNING, "ignoring bad call to AddModifier()");
198     return false;
199     }
200     if (!nChan) { // initial call?
201     if (!SetDataFormat(dtyp))
202     return false;
203     nChan = NCSAMP;
204     } else if (nChan != NCSAMP) {
205     error(INTERNAL, "number of spectral channels must be fixed");
206     return false;
207     }
208     if (Ready()) {
209     error(INTERNAL, "call to AddModifier() after PrepOutput()");
210     return false;
211     }
212     LUENT * lp = lu_find(&modLUT, modn);
213     if (lp->data) {
214     sprintf(errmsg, "duplicate modifier '%s'", modn);
215     error(USER, errmsg);
216     return false;
217     }
218     if (!lp->key) // create new entry
219     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
220    
221     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
222     if (!mp) return false;
223     lp->data = (char *)mp;
224     const int modndx = hasFormat(outspec, "sb");
225     const int binndx = hasFormat(outspec, "diouxX");
226     int bin0 = 0;
227     char fnbuf[512];
228     if (!modndx | (modndx > binndx))
229     sprintf(fnbuf, outspec, bin0, modn);
230     else
231     sprintf(fnbuf, outspec, modn, bin0);
232     RcontribOutput * olast = NULL;
233     RcontribOutput * op;
234     for (op = outList; op; op = (olast=op)->next) {
235     if (strcmp(op->GetName(), fnbuf))
236     continue;
237     if (modndx) { // this ain't right
238     sprintf(errmsg, "output name collision for '%s'", fnbuf);
239     error(USER, errmsg);
240     return false;
241     }
242     if ((binndx > 0) & (xres > 0)) {
243     sprintf(fnbuf, outspec, ++bin0);
244     continue; // each bin goes to another image
245     }
246     mp->coffset = op->rowBytes; // else add to what's there
247     break;
248     }
249     if (!op) { // create new output channel?
250     op = new RcontribOutput(fnbuf);
251     if (olast) olast->next = op;
252     else outList = op;
253     }
254     mp->opl = op; // first (maybe only) output channel
255     if (modndx) // remember modifier if part of name
256     op->omod = lp->key;
257     if (binndx > 0) { // append output image/bin list
258     op->rowBytes += dsiz;
259     op->obin = bin0;
260     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
261     if (!modndx | (modndx > binndx))
262     sprintf(fnbuf, outspec, bin0+bincnt, modn);
263     else
264     sprintf(fnbuf, outspec, modn, bin0+bincnt);
265     if (!op->next) {
266     olast = op;
267     olast->next = op = new RcontribOutput(fnbuf);
268     if (modndx) op->omod = lp->key;
269     op->obin = bin0+bincnt;
270     } else {
271     op = op->next;
272     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
273     "bin number mismatch in AddModifier()");
274     }
275     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
276     "row offset mismatch in AddModifier()");
277     op->rowBytes += dsiz;
278     }
279     } else // else send all results to this channel
280     op->rowBytes += bincnt*dsiz;
281     return true;
282     }
283    
284     // Add a file of modifiers with associated arguments
285     bool
286     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
287     const char *prms,
288     const char *binval, int bincnt)
289     {
290     char * path = getpath(const_cast<char *>(modfn),
291     getrlibpath(), R_OK);
292     FILE * fp;
293     if (!path || !(fp = fopen(path, "r"))) {
294     sprintf(errmsg, "cannot %s modifier file '%s'",
295     path ? "open" : "find", modfn);
296     error(SYSTEM, errmsg);
297     return false;
298     }
299     char mod[MAXSTR];
300     while (fgetword(mod, sizeof(mod), fp))
301     if (!AddModifier(mod, outspec, prms, binval, bincnt))
302     return false;
303     fclose(fp);
304     return true;
305     }
306    
307 greg 2.4 // call-back to check if modifier has been loaded
308     static int
309     checkModExists(const LUENT *lp, void *p)
310     {
311     if (modifier(lp->key) != OVOID)
312     return 1;
313    
314     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
315     error(WARNING, errmsg);
316     return 0;
317     }
318    
319 greg 2.1 // Prepare output channels and return # completed rows
320     int
321     RcontribSimulManager::PrepOutput()
322     {
323     if (!outList || !RtraceSimulManager::Ready()) {
324     error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
325     return -1;
326     }
327 greg 2.5 if (!cdsF) {
328     error(INTERNAL, "missing RdataShare constructor call (*cdsF)");
329     return -1;
330     }
331 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
332     return -1;
333    
334 greg 2.1 int remWarnings = 20;
335     for (RcontribOutput *op = outList; op; op = op->next) {
336     if (op->rData) {
337     error(INTERNAL, "output channel already open in PrepOutput()");
338     return -1;
339     }
340     op->nRows = yres * (xres + !xres);
341     op->rData = (*cdsF)(op->ofname, outOp,
342     GetHeadLen()+1024 + op->nRows*op->rowBytes);
343     freeqstr(op->ofname); op->ofname = NULL;
344     if (outOp == RCOrecover) {
345     int rd = op->CheckHeader(this);
346     if (rd < 0)
347     return -1;
348     if (rd >= op->nRows) {
349     if (remWarnings >= 0) {
350     sprintf(errmsg, "recovered output '%s' already done",
351     op->GetName());
352     error(WARNING, remWarnings ? errmsg : "etc...");
353     remWarnings--;
354     }
355     rd = op->nRows;
356     }
357     if (!rInPos | (rInPos > rd)) rInPos = rd;
358     } else if (!op->NewHeader(this))
359     return -1;
360     // make sure there's room
361     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
362     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
363     return -1; // calls error() for us
364     }
365     rowsDone.NewBitMap(outList->nRows); // create row completion map
366     rowsDone.ClearBits(0, rInPos, true);
367     return rInPos;
368     }
369    
370     // Create header in open write-only channel
371     bool
372     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
373     {
374     int headlim = rcp->GetHeadLen() + 1024;
375     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
376     int esiz;
377     const int etyp = rcp->GetFormat(&esiz);
378    
379     strcpy(hdr, HDRSTR);
380     strcat(hdr, "RADIANCE\n");
381     begData = strlen(hdr);
382     strcpy(hdr+begData, rcp->GetHeadStr());
383     begData += rcp->GetHeadLen();
384     if (omod) {
385     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
386     begData += strlen(hdr+begData);
387     }
388     if (obin >= 0) {
389     sprintf(hdr+begData, "BIN=%d\n", obin);
390     begData += strlen(hdr+begData);
391     }
392     strcpy(hdr+begData, ROWZEROSTR);
393     rowCountPos = begData+LNROWSTR;
394     begData += sizeof(ROWZEROSTR)-1;
395     if (!xres | (rowBytes > esiz)) {
396     sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
397     begData += strlen(hdr+begData);
398     }
399     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
400     begData += strlen(hdr+begData);
401     if (NCSAMP > 3) {
402     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
403     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
404     begData += strlen(hdr+begData);
405     }
406     if (etyp != 'c') {
407     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
408     begData += strlen(hdr+begData);
409     }
410     int align = 0;
411     switch (etyp) {
412     case 'f':
413     align = sizeof(float);
414     break;
415     case 'd':
416     align = sizeof(double);
417     break;
418     case 'c':
419     break;
420     default:
421     error(INTERNAL, "unsupported data type in NewHeader()");
422     return false;
423     }
424     strcpy(hdr+begData, FMTSTR); // write format string
425     begData += strlen(hdr+begData);
426     strcpy(hdr+begData, formstr(etyp));
427     begData += strlen(hdr+begData);
428     if (align) // align data at end of header
429     while ((begData+2) % align)
430     hdr[begData++] = ' ';
431     hdr[begData++] = '\n'; // EOL for data format
432     hdr[begData++] = '\n'; // end of nominal header
433     if ((xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
434     sprintf(hdr+begData, PIXSTDFMT, yres, xres);
435     begData += strlen(hdr+begData);
436     }
437     return rData->ReleaseMemory(hdr, RDSwrite);
438     }
439    
440     // find string argument in header for named variable
441     static const char *
442     findArgs(const char *hdr, const char *vnm, int len)
443     {
444     const char * npos = strnstr(hdr, vnm, len);
445    
446     if (!npos) return NULL;
447    
448     npos += strlen(vnm); // find start of (first) argument
449     len -= npos - hdr;
450     while (len-- > 0 && (*npos == '=') | isspace(*npos))
451     if (*npos++ == '\n')
452     return NULL;
453    
454     return (len >= 0) ? npos : NULL;
455     }
456    
457     // Load and check header in read/write channel
458     int
459     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
460     {
461     int esiz;
462     const int etyp = rcp->GetFormat(&esiz);
463     const int maxlen = rcp->GetHeadLen() + 1024;
464     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
465     const char * cp;
466     // find end of header
467     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
468     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
469     error(USER, errmsg);
470     return -1;
471     }
472     begData = cp - hdr + 1; // increment again at end
473     // check # components
474     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
475     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
476     error(USER, errmsg);
477     return -1;
478     }
479     // check format
480 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
481     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
482 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
483     error(USER, errmsg);
484     return -1;
485     }
486     // check #columns
487     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
488     !xres | (rowBytes > esiz)) {
489     sprintf(errmsg, "expected NCOLS=%d in '%s'",
490     int(rowBytes/esiz), GetName());
491     error(USER, errmsg);
492     return -1;
493     }
494     // find row count
495     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
496     sprintf(errmsg, "missing NROWS in '%s'", GetName());
497     error(USER, errmsg);
498     return -1;
499     }
500     rowCountPos = cp - hdr;
501     int rlast = atoi(cp);
502     begData++; // advance past closing EOL
503     if ((xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
504     char rbuf[64];
505     sprintf(rbuf, PIXSTDFMT, yres, xres);
506     int rlen = strlen(rbuf);
507     if (strncmp(rbuf, hdr+begData, rlen)) {
508     sprintf(errmsg, "bad resolution string in '%s'", GetName());
509     error(USER, errmsg);
510     return -1;
511     }
512     begData += rlen;
513     }
514     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
515     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
516     }
517    
518     // Rewind calculation (previous results unchanged)
519     bool
520     RcontribSimulManager::ResetRow(int r)
521     {
522     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
523     error(WARNING, "ignoring bad call to ResetRow()");
524     return (r == rInPos);
525     }
526     FlushQueue(); // finish current and reset
527     for (RcontribOutput *op = outList; op; op = op->next)
528     if (!op->SetRowsDone(r))
529     return false;
530    
531     rowsDone.ClearBits(r, rInPos-r, false);
532     rInPos = r;
533     return true;
534     }
535    
536     // call-back for averaging each modifier's contributions to assigned channel(s)
537     static int
538     putModContrib(const LUENT *lp, void *p)
539     {
540     RcontribMod * mp = (RcontribMod *)lp->data;
541     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
542     const double sca = 1./rcp->accum;
543     const DCOLORV * dvp = mp->cbin;
544     RcontribOutput * op = mp->opl;
545     int i, n;
546    
547     switch (rcp->GetFormat()) { // conversion based on output type
548     case 'd': {
549     double * dvo = (double *)op->InsertionP(mp->coffset);
550     for (n = mp->nbins; n--; ) {
551     for (i = 0; i < NCSAMP; i++)
552     *dvo++ = *dvp++ * sca;
553     if ((op->obin >= 0) & (n > 0))
554     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
555     }
556     } break;
557     case 'f': {
558     float * fvo = (float *)op->InsertionP(mp->coffset);
559     for (n = mp->nbins; n--; ) {
560     for (i = 0; i < NCSAMP; i++)
561     *fvo++ = float(*dvp++ * sca);
562     if ((op->obin >= 0) & (n > 0))
563     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
564     }
565     } break;
566     case 'c': {
567     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
568     for (n = mp->nbins; n--; ) {
569     SCOLOR scol;
570     for (i = 0; i < NCSAMP; i++)
571     scol[i] = COLORV(*dvp++ * sca);
572     scolor_scolr(cvo, scol);
573     cvo += LSCOLR;
574     if ((op->obin >= 0) & (n > 0))
575     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
576     }
577     } break;
578     default:
579     error(CONSISTENCY, "unsupported output type in sendModContrib()");
580     return -1;
581     }
582     // clear for next tally
583     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
584     return 1;
585     }
586    
587     // Add a ray/bundle to compute next record
588     int
589     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
590     {
591     if (!Ready())
592     return 0;
593     if (rInPos >= outList->nRows) {
594     error(WARNING, "ComputeRecord() called after last record");
595     return 0;
596     }
597     if (nkids > 0) { // in parent process?
598     int k = GetChild(); // updates output rows
599     if (k < 0) return -1; // can't really happen
600     RowAssignment rass;
601     rass.row = kidRow[k] = rInPos++;
602     rass.ac = accum;
603     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
604     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
605     != sizeof(FVECT)*2*accum) {
606     error(SYSTEM, "cannot write to child; dead process?");
607     return -1;
608     }
609     return accum; // tracing/output happens in child
610     }
611     if (NCSAMP != nChan) {
612     error(INTERNAL, "number of color channels must remain fixed");
613     return -1;
614     }
615     // actual work is done here...
616     if (EnqueueBundle(orig_direc, accum) < 0)
617     return -1;
618    
619     RcontribOutput * op; // prepare output buffers
620     for (op = outList; op; op = op->next)
621     if (!op->GetRow(rInPos))
622     return -1;
623     // convert averages & clear
624     if (lu_doall(&modLUT, putModContrib, this) < 0)
625     return -1;
626     // write buffers
627     for (op = outList; op; op = op->next)
628     op->DoneRow();
629    
630     if (!nkids) // update row if solo process
631     UpdateRowsDone(rInPos++); // XXX increment here is critical
632    
633     return accum;
634     }
635    
636     // Get next available child, returning index or -1 if forceWait & idling
637     int
638     RcontribSimulManager::GetChild(bool forceWait)
639     {
640     if (nkids <= 0)
641     return -1;
642     // take inventory
643     int pn, n = 0;
644     fd_set writeset, errset;
645     FD_ZERO(&writeset); FD_ZERO(&errset);
646     for (pn = nkids; pn--; ) {
647     if (kidRow[pn] < 0) { // child already ready?
648     if (forceWait) continue;
649     return pn; // good enough
650     }
651     FD_SET(kid[pn].w, &writeset); // will check on this one
652     FD_SET(kid[pn].w, &errset);
653     if (kid[pn].w >= n)
654     n = kid[pn].w + 1;
655     }
656     if (!n) // every child is idle?
657     return -1;
658     // wait on "busy" child(ren)
659     while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
660     if (errno != EINTR) {
661     error(SYSTEM, "select call failed in GetChild()");
662     return -1;
663     }
664     pn = -1; // get flags set by select
665     for (n = nkids; n--; )
666     if (kidRow[n] >= 0 &&
667     FD_ISSET(kid[n].w, &writeset) |
668     FD_ISSET(kid[n].w, &errset)) {
669     // update output row counts
670 greg 2.3 if (!FD_ISSET(kid[n].w, &errset))
671     UpdateRowsDone(kidRow[n]);
672 greg 2.1 kidRow[n] = -1; // flag it available
673     pn = n;
674     }
675     return pn;
676     }
677    
678     // Update row completion bitmap and update outputs (does not change rInPos)
679     bool
680     RcontribSimulManager::UpdateRowsDone(int r)
681     {
682     if (!rowsDone.TestAndSet(r)) {
683     error(WARNING, "redundant call to UpdateRowsDone()");
684     return false;
685     }
686     int nDone = GetRowFinished();
687     if (nDone <= r)
688     return true; // nothing to update, yet
689     for (RcontribOutput *op = outList; op; op = op->next)
690     if (!op->SetRowsDone(nDone))
691     return false;
692     return true; // up-to-date
693     }
694    
695     // Run rcontrib child process (never returns)
696     void
697     RcontribSimulManager::RunChild()
698     {
699     FVECT * vecList = NULL;
700     RowAssignment rass;
701     ssize_t nr;
702    
703     accum = 0;
704     errno = 0;
705     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
706     if (!rass.ac) {
707     error(CONSISTENCY, "bad accumulator count in child");
708     exit(1);
709     }
710     if (rass.ac > accum)
711     vecList = (FVECT *)erealloc(vecList,
712     sizeof(FVECT)*2*rass.ac);
713     accum = rass.ac;
714     rInPos = rass.row;
715    
716     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
717     sizeof(FVECT)*2*accum)
718     break;
719    
720     if (ComputeRecord(vecList) <= 0)
721     exit(1);
722     }
723     if (nr) {
724     error(SYSTEM, "read error in child process");
725     exit(1);
726     }
727     exit(0);
728     }
729    
730     // Add child processes as indicated
731     bool
732     RcontribSimulManager::StartKids(int n2go)
733     {
734     if ((n2go <= 0) | (nkids + n2go <= 1))
735     return false;
736    
737     if (!nkids)
738     cow_memshare(); // preload objects
739    
740     n2go += nkids; // => desired brood size
741     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
742     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
743    
744     fflush(stdout); // shouldn't use, anyway
745     while (nkids < n2go) {
746     kid[nkids] = sp_inactive;
747     kid[nkids].w = dup(1);
748     kid[nkids].flags |= PF_FILT_OUT;
749     int rv = open_process(&kid[nkids], NULL);
750     if (!rv) { // in child process?
751     while (nkids-- > 0)
752     close(kid[nkids].w);
753     free(kid); free(kidRow);
754     kid = NULL; kidRow = NULL;
755     RunChild(); // should never return
756     _exit(1);
757     }
758     if (rv < 0) {
759     error(SYSTEM, "cannot fork worker process");
760     return false;
761     }
762     kidRow[nkids++] = -1; // newborn is ready
763     }
764     return true;
765     }
766    
767     // Reap the indicated number of children (all if 0)
768     int
769     RcontribSimulManager::StopKids(int n2end)
770     {
771     if (nkids <= 0)
772     return 0;
773     FlushQueue();
774     int status = 0;
775     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
776     status = close_processes(kid, nkids);
777     free(kid); free(kidRow);
778     kid = NULL; kidRow = NULL;
779     nkids = 0;
780     } else { // else shrink family
781     int st;
782     while (n2end-- > 0)
783     if ((st = close_process(&kid[--nkids])) > 0)
784     status = st;
785     // could call realloc(), but it hardly matters
786     }
787     if (status) {
788     sprintf(errmsg, "non-zero (%d) status from child", status);
789     error(WARNING, errmsg);
790     }
791     return status;
792     }
793    
794     // Set number of computation threads (0 => #cores)
795     int
796     RcontribSimulManager::SetThreadCount(int nt)
797     {
798     if (!Ready()) {
799     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
800     return 0;
801     }
802     if (nt < 0)
803     return nkids;
804     if (!nt) nt = GetNCores();
805     int status = 0;
806     if (nt == 1)
807     status = StopKids();
808     else if (nt < nkids)
809     status = StopKids(nkids-nt);
810     else if (nt > nkids)
811     StartKids(nt-nkids);
812     if (status) {
813     sprintf(errmsg, "non-zero (%d) status from child", status);
814     error(WARNING, errmsg);
815     }
816     return nkids;
817     }