ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.4
Committed: Fri Nov 1 16:17:33 2024 UTC (6 months, 2 weeks ago) by greg
Branch: MAIN
Changes since 2.3: +44 -14 lines
Log Message:
feat(rxcontrib): more robust checking of bin expressions, etc.

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.4 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.3 2024/10/30 01:38:21 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27 greg 2.2 int contrib = 0; // computing contributions?
28    
29     int xres = 0; // horizontal (scan) size
30     int yres = 0; // vertical resolution
31 greg 2.1
32     // new/exclusive, overwrite if exists, or recover data
33     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
34     RDSread|RDSwrite};
35    
36     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
37     #define LNROWSTR 6
38    
39     // Modifier channel for recording contributions (no constructor/destructor)
40     struct RcontribMod {
41     RcontribOutput * opl; // pointer to first output channel
42     char * params; // parameters string
43 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
44 greg 2.1 int nbins; // bin count this modifier
45     int coffset; // column offset in bytes
46     DCOLORV cbin[1]; // bin accumulator (extends struct)
47     /// Access specific (spectral) color bin
48     DCOLORV * operator[](int n) {
49     if ((n < 0) | (n >= nbins)) return NULL;
50     return cbin + n*NCSAMP;
51     }
52     };
53    
54 greg 2.2 // Struct used to assign record calculation to child
55 greg 2.1 struct RowAssignment {
56     uint32 row; // row to do
57     uint32 ac; // accumulation count
58     };
59    
60     // Our default data share function
61     RdataShare *
62     defDataShare(const char *name, RCOutputOp op, size_t siz)
63     {
64     return new RdataShareMap(name, RSDOflags[op], siz);
65     }
66    
67     // Allocate rcontrib accumulator
68     RcontribMod *
69     NewRcMod(const char *prms, const char *binexpr, int ncbins)
70     {
71     if (ncbins <= 0) return NULL;
72     if (!prms) prms = "";
73 greg 2.4 if (!binexpr & (ncbins > 1)) {
74     error(INTERNAL, "missing bin expression");
75     return NULL;
76     }
77     if (ncbins == 1) { // shouldn't have bin expression?
78     if (binexpr && strcmp(binexpr, "0"))
79     error(WARNING, "ignoring non-zero expression for single bin");
80     prms = "";
81     binexpr = NULL;
82     }
83 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
84     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
85     strlen(prms)+1);
86    
87     mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
88 greg 2.4 if (binexpr) {
89     mp->binv = eparse(const_cast<char *>(binexpr));
90     CHECK(mp->binv->type==NUM, WARNING, "constant bin expression");
91     }
92 greg 2.1 mp->nbins = ncbins;
93     return mp;
94     }
95    
96     // Free an RcontribMod
97     void
98     FreeRcMod(void *p)
99     {
100     if (!p) return;
101 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
102     if (bep) epfree(bep, true);
103 greg 2.1 efree(p);
104     }
105    
106     // Set output format ('f', 'd', or 'c')
107     bool
108     RcontribSimulManager::SetDataFormat(int ty)
109     {
110     if (outList) {
111     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
112     return false;
113     }
114     switch (ty) {
115     case 'f':
116     dsiz = sizeof(float)*NCSAMP;
117     break;
118     case 'd':
119     dsiz = sizeof(double)*NCSAMP;
120     break;
121     case 'c':
122     dsiz = LSCOLR;
123     break;
124     default:
125     sprintf(errmsg, "unsupported output format '%c'", ty);
126     error(INTERNAL, errmsg);
127     return false;
128     }
129     dtyp = ty;
130     return true;
131     }
132    
133     // static call-back for rcontrib ray-tracing
134     int
135     RcontribSimulManager::RctCall(RAY *r, void *cd)
136     {
137     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
138     return 0;
139     // shadow ray not on source?
140     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
141     return 0;
142    
143     const char * mname = objptr(r->ro->omod)->oname;
144     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
145     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
146     if (!mp)
147     return 0; // not in our modifier list
148    
149 greg 2.4 int bi = 0; // get bin index
150     if (mp->binv) {
151     worldfunc(RCCONTEXT, r); // compute bin #
152     set_eparams(mp->params);
153     double bval = evalue(mp->binv);
154     if (bval <= -.5)
155     return 0; // silently ignore negative bin index
156     bi = int(bval + .5);
157     }
158     DCOLORV * dvp = (*mp)[bi];
159 greg 2.1 if (!dvp) {
160 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
161 greg 2.1 error(WARNING, errmsg);
162     return 0;
163     }
164     SCOLOR contr;
165     raycontrib(contr, r, PRIMARY); // compute coefficient
166     if (contrib)
167     smultscolor(contr, r->rcol); // -> value contribution
168     for (int i = 0; i < NCSAMP; i++)
169     *dvp++ += contr[i]; // accumulate color/spectrum
170     return 1;
171     }
172    
173     // check for given format type in string, return last char position
174     static int
175     hasFormat(const char *tst, const char *match)
176     {
177     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
178     const char * s = tst;
179     while (*s) {
180     if (*s++ != '%')
181     continue;
182     while (!strchr(allPoss, *s))
183     s++;
184     if (strchr(match, *s))
185     break;
186     s++;
187     }
188     return (*s != '\0')*(s - tst);
189     }
190    
191     // Add a modifier and arguments, opening output file(s)
192     bool
193     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
194     const char *prms, const char *binval, int bincnt)
195     {
196     if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
197     error(WARNING, "ignoring bad call to AddModifier()");
198     return false;
199     }
200     if (!nChan) { // initial call?
201     if (!SetDataFormat(dtyp))
202     return false;
203     nChan = NCSAMP;
204     } else if (nChan != NCSAMP) {
205     error(INTERNAL, "number of spectral channels must be fixed");
206     return false;
207     }
208     if (Ready()) {
209     error(INTERNAL, "call to AddModifier() after PrepOutput()");
210     return false;
211     }
212     LUENT * lp = lu_find(&modLUT, modn);
213     if (lp->data) {
214     sprintf(errmsg, "duplicate modifier '%s'", modn);
215     error(USER, errmsg);
216     return false;
217     }
218     if (!lp->key) // create new entry
219     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
220    
221     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
222     if (!mp) return false;
223     lp->data = (char *)mp;
224     const int modndx = hasFormat(outspec, "sb");
225     const int binndx = hasFormat(outspec, "diouxX");
226     int bin0 = 0;
227     char fnbuf[512];
228     if (!modndx | (modndx > binndx))
229     sprintf(fnbuf, outspec, bin0, modn);
230     else
231     sprintf(fnbuf, outspec, modn, bin0);
232     RcontribOutput * olast = NULL;
233     RcontribOutput * op;
234     for (op = outList; op; op = (olast=op)->next) {
235     if (strcmp(op->GetName(), fnbuf))
236     continue;
237     if (modndx) { // this ain't right
238     sprintf(errmsg, "output name collision for '%s'", fnbuf);
239     error(USER, errmsg);
240     return false;
241     }
242     if ((binndx > 0) & (xres > 0)) {
243     sprintf(fnbuf, outspec, ++bin0);
244     continue; // each bin goes to another image
245     }
246     mp->coffset = op->rowBytes; // else add to what's there
247     break;
248     }
249     if (!op) { // create new output channel?
250     op = new RcontribOutput(fnbuf);
251     if (olast) olast->next = op;
252     else outList = op;
253     }
254     mp->opl = op; // first (maybe only) output channel
255     if (modndx) // remember modifier if part of name
256     op->omod = lp->key;
257     if (binndx > 0) { // append output image/bin list
258     op->rowBytes += dsiz;
259     op->obin = bin0;
260     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
261     if (!modndx | (modndx > binndx))
262     sprintf(fnbuf, outspec, bin0+bincnt, modn);
263     else
264     sprintf(fnbuf, outspec, modn, bin0+bincnt);
265     if (!op->next) {
266     olast = op;
267     olast->next = op = new RcontribOutput(fnbuf);
268     if (modndx) op->omod = lp->key;
269     op->obin = bin0+bincnt;
270     } else {
271     op = op->next;
272     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
273     "bin number mismatch in AddModifier()");
274     }
275     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
276     "row offset mismatch in AddModifier()");
277     op->rowBytes += dsiz;
278     }
279     } else // else send all results to this channel
280     op->rowBytes += bincnt*dsiz;
281     return true;
282     }
283    
284     // Add a file of modifiers with associated arguments
285     bool
286     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
287     const char *prms,
288     const char *binval, int bincnt)
289     {
290     char * path = getpath(const_cast<char *>(modfn),
291     getrlibpath(), R_OK);
292     FILE * fp;
293     if (!path || !(fp = fopen(path, "r"))) {
294     sprintf(errmsg, "cannot %s modifier file '%s'",
295     path ? "open" : "find", modfn);
296     error(SYSTEM, errmsg);
297     return false;
298     }
299     char mod[MAXSTR];
300     while (fgetword(mod, sizeof(mod), fp))
301     if (!AddModifier(mod, outspec, prms, binval, bincnt))
302     return false;
303     fclose(fp);
304     return true;
305     }
306    
307 greg 2.4 // call-back to check if modifier has been loaded
308     static int
309     checkModExists(const LUENT *lp, void *p)
310     {
311     if (modifier(lp->key) != OVOID)
312     return 1;
313    
314     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
315     error(WARNING, errmsg);
316     return 0;
317     }
318    
319 greg 2.1 // Prepare output channels and return # completed rows
320     int
321     RcontribSimulManager::PrepOutput()
322     {
323     if (!outList || !RtraceSimulManager::Ready()) {
324     error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
325     return -1;
326     }
327 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
328     return -1;
329    
330 greg 2.1 int remWarnings = 20;
331     for (RcontribOutput *op = outList; op; op = op->next) {
332     if (op->rData) {
333     error(INTERNAL, "output channel already open in PrepOutput()");
334     return -1;
335     }
336     op->nRows = yres * (xres + !xres);
337     op->rData = (*cdsF)(op->ofname, outOp,
338     GetHeadLen()+1024 + op->nRows*op->rowBytes);
339     freeqstr(op->ofname); op->ofname = NULL;
340     if (outOp == RCOrecover) {
341     int rd = op->CheckHeader(this);
342     if (rd < 0)
343     return -1;
344     if (rd >= op->nRows) {
345     if (remWarnings >= 0) {
346     sprintf(errmsg, "recovered output '%s' already done",
347     op->GetName());
348     error(WARNING, remWarnings ? errmsg : "etc...");
349     remWarnings--;
350     }
351     rd = op->nRows;
352     }
353     if (!rInPos | (rInPos > rd)) rInPos = rd;
354     } else if (!op->NewHeader(this))
355     return -1;
356     // make sure there's room
357     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
358     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
359     return -1; // calls error() for us
360     }
361     rowsDone.NewBitMap(outList->nRows); // create row completion map
362     rowsDone.ClearBits(0, rInPos, true);
363     return rInPos;
364     }
365    
366     // Create header in open write-only channel
367     bool
368     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
369     {
370     int headlim = rcp->GetHeadLen() + 1024;
371     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
372     int esiz;
373     const int etyp = rcp->GetFormat(&esiz);
374    
375     strcpy(hdr, HDRSTR);
376     strcat(hdr, "RADIANCE\n");
377     begData = strlen(hdr);
378     strcpy(hdr+begData, rcp->GetHeadStr());
379     begData += rcp->GetHeadLen();
380     if (omod) {
381     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
382     begData += strlen(hdr+begData);
383     }
384     if (obin >= 0) {
385     sprintf(hdr+begData, "BIN=%d\n", obin);
386     begData += strlen(hdr+begData);
387     }
388     strcpy(hdr+begData, ROWZEROSTR);
389     rowCountPos = begData+LNROWSTR;
390     begData += sizeof(ROWZEROSTR)-1;
391     if (!xres | (rowBytes > esiz)) {
392     sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
393     begData += strlen(hdr+begData);
394     }
395     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
396     begData += strlen(hdr+begData);
397     if (NCSAMP > 3) {
398     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
399     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
400     begData += strlen(hdr+begData);
401     }
402     if (etyp != 'c') {
403     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
404     begData += strlen(hdr+begData);
405     }
406     int align = 0;
407     switch (etyp) {
408     case 'f':
409     align = sizeof(float);
410     break;
411     case 'd':
412     align = sizeof(double);
413     break;
414     case 'c':
415     break;
416     default:
417     error(INTERNAL, "unsupported data type in NewHeader()");
418     return false;
419     }
420     strcpy(hdr+begData, FMTSTR); // write format string
421     begData += strlen(hdr+begData);
422     strcpy(hdr+begData, formstr(etyp));
423     begData += strlen(hdr+begData);
424     if (align) // align data at end of header
425     while ((begData+2) % align)
426     hdr[begData++] = ' ';
427     hdr[begData++] = '\n'; // EOL for data format
428     hdr[begData++] = '\n'; // end of nominal header
429     if ((xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
430     sprintf(hdr+begData, PIXSTDFMT, yres, xres);
431     begData += strlen(hdr+begData);
432     }
433     return rData->ReleaseMemory(hdr, RDSwrite);
434     }
435    
436     // find string argument in header for named variable
437     static const char *
438     findArgs(const char *hdr, const char *vnm, int len)
439     {
440     const char * npos = strnstr(hdr, vnm, len);
441    
442     if (!npos) return NULL;
443    
444     npos += strlen(vnm); // find start of (first) argument
445     len -= npos - hdr;
446     while (len-- > 0 && (*npos == '=') | isspace(*npos))
447     if (*npos++ == '\n')
448     return NULL;
449    
450     return (len >= 0) ? npos : NULL;
451     }
452    
453     // Load and check header in read/write channel
454     int
455     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
456     {
457     int esiz;
458     const int etyp = rcp->GetFormat(&esiz);
459     const int maxlen = rcp->GetHeadLen() + 1024;
460     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
461     const char * cp;
462     // find end of header
463     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
464     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
465     error(USER, errmsg);
466     return -1;
467     }
468     begData = cp - hdr + 1; // increment again at end
469     // check # components
470     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
471     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
472     error(USER, errmsg);
473     return -1;
474     }
475     // check format
476 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
477     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
478 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
479     error(USER, errmsg);
480     return -1;
481     }
482     // check #columns
483     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
484     !xres | (rowBytes > esiz)) {
485     sprintf(errmsg, "expected NCOLS=%d in '%s'",
486     int(rowBytes/esiz), GetName());
487     error(USER, errmsg);
488     return -1;
489     }
490     // find row count
491     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
492     sprintf(errmsg, "missing NROWS in '%s'", GetName());
493     error(USER, errmsg);
494     return -1;
495     }
496     rowCountPos = cp - hdr;
497     int rlast = atoi(cp);
498     begData++; // advance past closing EOL
499     if ((xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
500     char rbuf[64];
501     sprintf(rbuf, PIXSTDFMT, yres, xres);
502     int rlen = strlen(rbuf);
503     if (strncmp(rbuf, hdr+begData, rlen)) {
504     sprintf(errmsg, "bad resolution string in '%s'", GetName());
505     error(USER, errmsg);
506     return -1;
507     }
508     begData += rlen;
509     }
510     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
511     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
512     }
513    
514     // Rewind calculation (previous results unchanged)
515     bool
516     RcontribSimulManager::ResetRow(int r)
517     {
518     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
519     error(WARNING, "ignoring bad call to ResetRow()");
520     return (r == rInPos);
521     }
522     FlushQueue(); // finish current and reset
523     for (RcontribOutput *op = outList; op; op = op->next)
524     if (!op->SetRowsDone(r))
525     return false;
526    
527     rowsDone.ClearBits(r, rInPos-r, false);
528     rInPos = r;
529     return true;
530     }
531    
532     // call-back for averaging each modifier's contributions to assigned channel(s)
533     static int
534     putModContrib(const LUENT *lp, void *p)
535     {
536     RcontribMod * mp = (RcontribMod *)lp->data;
537     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
538     const double sca = 1./rcp->accum;
539     const DCOLORV * dvp = mp->cbin;
540     RcontribOutput * op = mp->opl;
541     int i, n;
542    
543     switch (rcp->GetFormat()) { // conversion based on output type
544     case 'd': {
545     double * dvo = (double *)op->InsertionP(mp->coffset);
546     for (n = mp->nbins; n--; ) {
547     for (i = 0; i < NCSAMP; i++)
548     *dvo++ = *dvp++ * sca;
549     if ((op->obin >= 0) & (n > 0))
550     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
551     }
552     } break;
553     case 'f': {
554     float * fvo = (float *)op->InsertionP(mp->coffset);
555     for (n = mp->nbins; n--; ) {
556     for (i = 0; i < NCSAMP; i++)
557     *fvo++ = float(*dvp++ * sca);
558     if ((op->obin >= 0) & (n > 0))
559     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
560     }
561     } break;
562     case 'c': {
563     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
564     for (n = mp->nbins; n--; ) {
565     SCOLOR scol;
566     for (i = 0; i < NCSAMP; i++)
567     scol[i] = COLORV(*dvp++ * sca);
568     scolor_scolr(cvo, scol);
569     cvo += LSCOLR;
570     if ((op->obin >= 0) & (n > 0))
571     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
572     }
573     } break;
574     default:
575     error(CONSISTENCY, "unsupported output type in sendModContrib()");
576     return -1;
577     }
578     // clear for next tally
579     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
580     return 1;
581     }
582    
583     // Add a ray/bundle to compute next record
584     int
585     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
586     {
587     if (!Ready())
588     return 0;
589     if (rInPos >= outList->nRows) {
590     error(WARNING, "ComputeRecord() called after last record");
591     return 0;
592     }
593     if (nkids > 0) { // in parent process?
594     int k = GetChild(); // updates output rows
595     if (k < 0) return -1; // can't really happen
596     RowAssignment rass;
597     rass.row = kidRow[k] = rInPos++;
598     rass.ac = accum;
599     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
600     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
601     != sizeof(FVECT)*2*accum) {
602     error(SYSTEM, "cannot write to child; dead process?");
603     return -1;
604     }
605     return accum; // tracing/output happens in child
606     }
607     if (NCSAMP != nChan) {
608     error(INTERNAL, "number of color channels must remain fixed");
609     return -1;
610     }
611     // actual work is done here...
612     if (EnqueueBundle(orig_direc, accum) < 0)
613     return -1;
614    
615     RcontribOutput * op; // prepare output buffers
616     for (op = outList; op; op = op->next)
617     if (!op->GetRow(rInPos))
618     return -1;
619     // convert averages & clear
620     if (lu_doall(&modLUT, putModContrib, this) < 0)
621     return -1;
622     // write buffers
623     for (op = outList; op; op = op->next)
624     op->DoneRow();
625    
626     if (!nkids) // update row if solo process
627     UpdateRowsDone(rInPos++); // XXX increment here is critical
628    
629     return accum;
630     }
631    
632     // Get next available child, returning index or -1 if forceWait & idling
633     int
634     RcontribSimulManager::GetChild(bool forceWait)
635     {
636     if (nkids <= 0)
637     return -1;
638     // take inventory
639     int pn, n = 0;
640     fd_set writeset, errset;
641     FD_ZERO(&writeset); FD_ZERO(&errset);
642     for (pn = nkids; pn--; ) {
643     if (kidRow[pn] < 0) { // child already ready?
644     if (forceWait) continue;
645     return pn; // good enough
646     }
647     FD_SET(kid[pn].w, &writeset); // will check on this one
648     FD_SET(kid[pn].w, &errset);
649     if (kid[pn].w >= n)
650     n = kid[pn].w + 1;
651     }
652     if (!n) // every child is idle?
653     return -1;
654     // wait on "busy" child(ren)
655     while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
656     if (errno != EINTR) {
657     error(SYSTEM, "select call failed in GetChild()");
658     return -1;
659     }
660     pn = -1; // get flags set by select
661     for (n = nkids; n--; )
662     if (kidRow[n] >= 0 &&
663     FD_ISSET(kid[n].w, &writeset) |
664     FD_ISSET(kid[n].w, &errset)) {
665     // update output row counts
666 greg 2.3 if (!FD_ISSET(kid[n].w, &errset))
667     UpdateRowsDone(kidRow[n]);
668 greg 2.1 kidRow[n] = -1; // flag it available
669     pn = n;
670     }
671     return pn;
672     }
673    
674     // Update row completion bitmap and update outputs (does not change rInPos)
675     bool
676     RcontribSimulManager::UpdateRowsDone(int r)
677     {
678     if (!rowsDone.TestAndSet(r)) {
679     error(WARNING, "redundant call to UpdateRowsDone()");
680     return false;
681     }
682     int nDone = GetRowFinished();
683     if (nDone <= r)
684     return true; // nothing to update, yet
685     for (RcontribOutput *op = outList; op; op = op->next)
686     if (!op->SetRowsDone(nDone))
687     return false;
688     return true; // up-to-date
689     }
690    
691     // Run rcontrib child process (never returns)
692     void
693     RcontribSimulManager::RunChild()
694     {
695     FVECT * vecList = NULL;
696     RowAssignment rass;
697     ssize_t nr;
698    
699     accum = 0;
700     errno = 0;
701     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
702     if (!rass.ac) {
703     error(CONSISTENCY, "bad accumulator count in child");
704     exit(1);
705     }
706     if (rass.ac > accum)
707     vecList = (FVECT *)erealloc(vecList,
708     sizeof(FVECT)*2*rass.ac);
709     accum = rass.ac;
710     rInPos = rass.row;
711    
712     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
713     sizeof(FVECT)*2*accum)
714     break;
715    
716     if (ComputeRecord(vecList) <= 0)
717     exit(1);
718     }
719     if (nr) {
720     error(SYSTEM, "read error in child process");
721     exit(1);
722     }
723     exit(0);
724     }
725    
726     // Add child processes as indicated
727     bool
728     RcontribSimulManager::StartKids(int n2go)
729     {
730     if ((n2go <= 0) | (nkids + n2go <= 1))
731     return false;
732    
733     if (!nkids)
734     cow_memshare(); // preload objects
735    
736     n2go += nkids; // => desired brood size
737     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
738     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
739    
740     fflush(stdout); // shouldn't use, anyway
741     while (nkids < n2go) {
742     kid[nkids] = sp_inactive;
743     kid[nkids].w = dup(1);
744     kid[nkids].flags |= PF_FILT_OUT;
745     int rv = open_process(&kid[nkids], NULL);
746     if (!rv) { // in child process?
747     while (nkids-- > 0)
748     close(kid[nkids].w);
749     free(kid); free(kidRow);
750     kid = NULL; kidRow = NULL;
751     RunChild(); // should never return
752     _exit(1);
753     }
754     if (rv < 0) {
755     error(SYSTEM, "cannot fork worker process");
756     return false;
757     }
758     kidRow[nkids++] = -1; // newborn is ready
759     }
760     return true;
761     }
762    
763     // Reap the indicated number of children (all if 0)
764     int
765     RcontribSimulManager::StopKids(int n2end)
766     {
767     if (nkids <= 0)
768     return 0;
769     FlushQueue();
770     int status = 0;
771     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
772     status = close_processes(kid, nkids);
773     free(kid); free(kidRow);
774     kid = NULL; kidRow = NULL;
775     nkids = 0;
776     } else { // else shrink family
777     int st;
778     while (n2end-- > 0)
779     if ((st = close_process(&kid[--nkids])) > 0)
780     status = st;
781     // could call realloc(), but it hardly matters
782     }
783     if (status) {
784     sprintf(errmsg, "non-zero (%d) status from child", status);
785     error(WARNING, errmsg);
786     }
787     return status;
788     }
789    
790     // Set number of computation threads (0 => #cores)
791     int
792     RcontribSimulManager::SetThreadCount(int nt)
793     {
794     if (!Ready()) {
795     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
796     return 0;
797     }
798     if (nt < 0)
799     return nkids;
800     if (!nt) nt = GetNCores();
801     int status = 0;
802     if (nt == 1)
803     status = StopKids();
804     else if (nt < nkids)
805     status = StopKids(nkids-nt);
806     else if (nt > nkids)
807     StartKids(nt-nkids);
808     if (status) {
809     sprintf(errmsg, "non-zero (%d) status from child", status);
810     error(WARNING, errmsg);
811     }
812     return nkids;
813     }