ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.7
Committed: Wed Nov 6 19:45:59 2024 UTC (6 months, 1 week ago) by greg
Branch: MAIN
Changes since 2.6: +12 -13 lines
Log Message:
feat(rxcontrib): Folded resolution and contrib flag into main class

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.7 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.6 2024/11/06 18:28:52 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27     // new/exclusive, overwrite if exists, or recover data
28     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29     RDSread|RDSwrite};
30    
31     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32     #define LNROWSTR 6
33    
34     // Modifier channel for recording contributions (no constructor/destructor)
35     struct RcontribMod {
36     RcontribOutput * opl; // pointer to first output channel
37     char * params; // parameters string
38 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
39 greg 2.1 int nbins; // bin count this modifier
40     int coffset; // column offset in bytes
41     DCOLORV cbin[1]; // bin accumulator (extends struct)
42     /// Access specific (spectral) color bin
43     DCOLORV * operator[](int n) {
44     if ((n < 0) | (n >= nbins)) return NULL;
45     return cbin + n*NCSAMP;
46     }
47     };
48    
49 greg 2.2 // Struct used to assign record calculation to child
50 greg 2.1 struct RowAssignment {
51     uint32 row; // row to do
52     uint32 ac; // accumulation count
53     };
54    
55     // Our default data share function
56     RdataShare *
57     defDataShare(const char *name, RCOutputOp op, size_t siz)
58     {
59     return new RdataShareMap(name, RSDOflags[op], siz);
60     }
61    
62     // Allocate rcontrib accumulator
63     RcontribMod *
64     NewRcMod(const char *prms, const char *binexpr, int ncbins)
65     {
66     if (ncbins <= 0) return NULL;
67     if (!prms) prms = "";
68 greg 2.4 if (!binexpr & (ncbins > 1)) {
69     error(INTERNAL, "missing bin expression");
70     return NULL;
71     }
72     if (ncbins == 1) { // shouldn't have bin expression?
73     if (binexpr && strcmp(binexpr, "0"))
74     error(WARNING, "ignoring non-zero expression for single bin");
75     prms = "";
76     binexpr = NULL;
77     }
78 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
79     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
80     strlen(prms)+1);
81    
82     mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
83 greg 2.4 if (binexpr) {
84     mp->binv = eparse(const_cast<char *>(binexpr));
85     CHECK(mp->binv->type==NUM, WARNING, "constant bin expression");
86     }
87 greg 2.1 mp->nbins = ncbins;
88     return mp;
89     }
90    
91     // Free an RcontribMod
92     void
93     FreeRcMod(void *p)
94     {
95     if (!p) return;
96 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
97     if (bep) epfree(bep, true);
98 greg 2.1 efree(p);
99     }
100    
101     // Set output format ('f', 'd', or 'c')
102     bool
103     RcontribSimulManager::SetDataFormat(int ty)
104     {
105     if (outList) {
106     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
107     return false;
108     }
109     switch (ty) {
110     case 'f':
111     dsiz = sizeof(float)*NCSAMP;
112     break;
113     case 'd':
114     dsiz = sizeof(double)*NCSAMP;
115     break;
116     case 'c':
117     dsiz = LSCOLR;
118     break;
119     default:
120     sprintf(errmsg, "unsupported output format '%c'", ty);
121     error(INTERNAL, errmsg);
122     return false;
123     }
124     dtyp = ty;
125     return true;
126     }
127    
128     // static call-back for rcontrib ray-tracing
129     int
130     RcontribSimulManager::RctCall(RAY *r, void *cd)
131     {
132     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
133     return 0;
134     // shadow ray not on source?
135     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
136     return 0;
137    
138     const char * mname = objptr(r->ro->omod)->oname;
139     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
140     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
141     if (!mp)
142     return 0; // not in our modifier list
143    
144 greg 2.4 int bi = 0; // get bin index
145     if (mp->binv) {
146 greg 2.5 worldfunc(RCCONTEXT, r);
147 greg 2.4 set_eparams(mp->params);
148     double bval = evalue(mp->binv);
149     if (bval <= -.5)
150     return 0; // silently ignore negative bin index
151     bi = int(bval + .5);
152     }
153     DCOLORV * dvp = (*mp)[bi];
154 greg 2.1 if (!dvp) {
155 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
156 greg 2.1 error(WARNING, errmsg);
157     return 0;
158     }
159     SCOLOR contr;
160     raycontrib(contr, r, PRIMARY); // compute coefficient
161 greg 2.7 if (rcp->HasFlag(RCcontrib))
162 greg 2.1 smultscolor(contr, r->rcol); // -> value contribution
163     for (int i = 0; i < NCSAMP; i++)
164     *dvp++ += contr[i]; // accumulate color/spectrum
165     return 1;
166     }
167    
168     // check for given format type in string, return last char position
169     static int
170     hasFormat(const char *tst, const char *match)
171     {
172     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
173     const char * s = tst;
174     while (*s) {
175     if (*s++ != '%')
176     continue;
177     while (!strchr(allPoss, *s))
178     s++;
179     if (strchr(match, *s))
180     break;
181     s++;
182     }
183     return (*s != '\0')*(s - tst);
184     }
185    
186     // Add a modifier and arguments, opening output file(s)
187     bool
188     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
189     const char *prms, const char *binval, int bincnt)
190     {
191     if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
192     error(WARNING, "ignoring bad call to AddModifier()");
193     return false;
194     }
195     if (!nChan) { // initial call?
196 greg 2.7 if ((xres < 0) | (yres <= 0)) {
197     error(INTERNAL, "xres, yres must be set before first modifier");
198     return false;
199     }
200 greg 2.1 if (!SetDataFormat(dtyp))
201     return false;
202     nChan = NCSAMP;
203     } else if (nChan != NCSAMP) {
204     error(INTERNAL, "number of spectral channels must be fixed");
205     return false;
206     }
207     if (Ready()) {
208     error(INTERNAL, "call to AddModifier() after PrepOutput()");
209     return false;
210     }
211     LUENT * lp = lu_find(&modLUT, modn);
212     if (lp->data) {
213     sprintf(errmsg, "duplicate modifier '%s'", modn);
214     error(USER, errmsg);
215     return false;
216     }
217     if (!lp->key) // create new entry
218     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
219    
220     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
221     if (!mp) return false;
222     lp->data = (char *)mp;
223     const int modndx = hasFormat(outspec, "sb");
224     const int binndx = hasFormat(outspec, "diouxX");
225     int bin0 = 0;
226     char fnbuf[512];
227     if (!modndx | (modndx > binndx))
228     sprintf(fnbuf, outspec, bin0, modn);
229     else
230     sprintf(fnbuf, outspec, modn, bin0);
231     RcontribOutput * olast = NULL;
232     RcontribOutput * op;
233     for (op = outList; op; op = (olast=op)->next) {
234     if (strcmp(op->GetName(), fnbuf))
235     continue;
236     if (modndx) { // this ain't right
237     sprintf(errmsg, "output name collision for '%s'", fnbuf);
238     error(USER, errmsg);
239     return false;
240     }
241     if ((binndx > 0) & (xres > 0)) {
242     sprintf(fnbuf, outspec, ++bin0);
243     continue; // each bin goes to another image
244     }
245     mp->coffset = op->rowBytes; // else add to what's there
246     break;
247     }
248     if (!op) { // create new output channel?
249     op = new RcontribOutput(fnbuf);
250     if (olast) olast->next = op;
251     else outList = op;
252     }
253     mp->opl = op; // first (maybe only) output channel
254     if (modndx) // remember modifier if part of name
255     op->omod = lp->key;
256     if (binndx > 0) { // append output image/bin list
257     op->rowBytes += dsiz;
258     op->obin = bin0;
259     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
260     if (!modndx | (modndx > binndx))
261     sprintf(fnbuf, outspec, bin0+bincnt, modn);
262     else
263     sprintf(fnbuf, outspec, modn, bin0+bincnt);
264     if (!op->next) {
265     olast = op;
266     olast->next = op = new RcontribOutput(fnbuf);
267     if (modndx) op->omod = lp->key;
268     op->obin = bin0+bincnt;
269     } else {
270     op = op->next;
271     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
272     "bin number mismatch in AddModifier()");
273     }
274     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
275     "row offset mismatch in AddModifier()");
276     op->rowBytes += dsiz;
277     }
278     } else // else send all results to this channel
279     op->rowBytes += bincnt*dsiz;
280     return true;
281     }
282    
283     // Add a file of modifiers with associated arguments
284     bool
285     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
286     const char *prms,
287     const char *binval, int bincnt)
288     {
289     char * path = getpath(const_cast<char *>(modfn),
290     getrlibpath(), R_OK);
291     FILE * fp;
292     if (!path || !(fp = fopen(path, "r"))) {
293     sprintf(errmsg, "cannot %s modifier file '%s'",
294     path ? "open" : "find", modfn);
295     error(SYSTEM, errmsg);
296     return false;
297     }
298     char mod[MAXSTR];
299     while (fgetword(mod, sizeof(mod), fp))
300     if (!AddModifier(mod, outspec, prms, binval, bincnt))
301     return false;
302     fclose(fp);
303     return true;
304     }
305    
306 greg 2.4 // call-back to check if modifier has been loaded
307     static int
308     checkModExists(const LUENT *lp, void *p)
309     {
310     if (modifier(lp->key) != OVOID)
311     return 1;
312    
313     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
314     error(WARNING, errmsg);
315     return 0;
316     }
317    
318 greg 2.1 // Prepare output channels and return # completed rows
319     int
320     RcontribSimulManager::PrepOutput()
321     {
322     if (!outList || !RtraceSimulManager::Ready()) {
323     error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
324     return -1;
325     }
326 greg 2.5 if (!cdsF) {
327     error(INTERNAL, "missing RdataShare constructor call (*cdsF)");
328     return -1;
329     }
330 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
331     return -1;
332    
333 greg 2.1 int remWarnings = 20;
334     for (RcontribOutput *op = outList; op; op = op->next) {
335     if (op->rData) {
336     error(INTERNAL, "output channel already open in PrepOutput()");
337     return -1;
338     }
339     op->nRows = yres * (xres + !xres);
340     op->rData = (*cdsF)(op->ofname, outOp,
341     GetHeadLen()+1024 + op->nRows*op->rowBytes);
342     freeqstr(op->ofname); op->ofname = NULL;
343     if (outOp == RCOrecover) {
344     int rd = op->CheckHeader(this);
345     if (rd < 0)
346     return -1;
347     if (rd >= op->nRows) {
348     if (remWarnings >= 0) {
349     sprintf(errmsg, "recovered output '%s' already done",
350     op->GetName());
351     error(WARNING, remWarnings ? errmsg : "etc...");
352     remWarnings--;
353     }
354     rd = op->nRows;
355     }
356     if (!rInPos | (rInPos > rd)) rInPos = rd;
357     } else if (!op->NewHeader(this))
358     return -1;
359     // make sure there's room
360     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
361     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
362     return -1; // calls error() for us
363     }
364     rowsDone.NewBitMap(outList->nRows); // create row completion map
365     rowsDone.ClearBits(0, rInPos, true);
366     return rInPos;
367     }
368    
369     // Create header in open write-only channel
370     bool
371     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
372     {
373     int headlim = rcp->GetHeadLen() + 1024;
374     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
375     int esiz;
376     const int etyp = rcp->GetFormat(&esiz);
377    
378     strcpy(hdr, HDRSTR);
379     strcat(hdr, "RADIANCE\n");
380     begData = strlen(hdr);
381     strcpy(hdr+begData, rcp->GetHeadStr());
382     begData += rcp->GetHeadLen();
383     if (omod) {
384     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
385     begData += strlen(hdr+begData);
386     }
387     if (obin >= 0) {
388     sprintf(hdr+begData, "BIN=%d\n", obin);
389     begData += strlen(hdr+begData);
390     }
391     strcpy(hdr+begData, ROWZEROSTR);
392     rowCountPos = begData+LNROWSTR;
393     begData += sizeof(ROWZEROSTR)-1;
394 greg 2.7 if (!rcp->xres | (rowBytes > esiz)) {
395 greg 2.1 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
396     begData += strlen(hdr+begData);
397     }
398     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
399     begData += strlen(hdr+begData);
400     if (NCSAMP > 3) {
401     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
402     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
403     begData += strlen(hdr+begData);
404     }
405     if (etyp != 'c') {
406     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
407     begData += strlen(hdr+begData);
408     }
409     int align = 0;
410     switch (etyp) {
411     case 'f':
412     align = sizeof(float);
413     break;
414     case 'd':
415     align = sizeof(double);
416     break;
417     case 'c':
418     break;
419     default:
420     error(INTERNAL, "unsupported data type in NewHeader()");
421     return false;
422     }
423     strcpy(hdr+begData, FMTSTR); // write format string
424     begData += strlen(hdr+begData);
425     strcpy(hdr+begData, formstr(etyp));
426     begData += strlen(hdr+begData);
427     if (align) // align data at end of header
428     while ((begData+2) % align)
429     hdr[begData++] = ' ';
430     hdr[begData++] = '\n'; // EOL for data format
431     hdr[begData++] = '\n'; // end of nominal header
432 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
433     sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
434 greg 2.1 begData += strlen(hdr+begData);
435     }
436     return rData->ReleaseMemory(hdr, RDSwrite);
437     }
438    
439     // find string argument in header for named variable
440     static const char *
441     findArgs(const char *hdr, const char *vnm, int len)
442     {
443     const char * npos = strnstr(hdr, vnm, len);
444    
445     if (!npos) return NULL;
446    
447     npos += strlen(vnm); // find start of (first) argument
448     len -= npos - hdr;
449     while (len-- > 0 && (*npos == '=') | isspace(*npos))
450     if (*npos++ == '\n')
451     return NULL;
452    
453     return (len >= 0) ? npos : NULL;
454     }
455    
456     // Load and check header in read/write channel
457     int
458     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
459     {
460     int esiz;
461     const int etyp = rcp->GetFormat(&esiz);
462     const int maxlen = rcp->GetHeadLen() + 1024;
463     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
464     const char * cp;
465     // find end of header
466     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
467     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
468     error(USER, errmsg);
469     return -1;
470     }
471     begData = cp - hdr + 1; // increment again at end
472     // check # components
473     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
474     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
475     error(USER, errmsg);
476     return -1;
477     }
478     // check format
479 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
480     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
481 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
482     error(USER, errmsg);
483     return -1;
484     }
485     // check #columns
486     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
487 greg 2.7 !rcp->xres | (rowBytes > esiz)) {
488 greg 2.1 sprintf(errmsg, "expected NCOLS=%d in '%s'",
489     int(rowBytes/esiz), GetName());
490     error(USER, errmsg);
491     return -1;
492     }
493     // find row count
494     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
495     sprintf(errmsg, "missing NROWS in '%s'", GetName());
496     error(USER, errmsg);
497     return -1;
498     }
499     rowCountPos = cp - hdr;
500     int rlast = atoi(cp);
501     begData++; // advance past closing EOL
502 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
503 greg 2.1 char rbuf[64];
504 greg 2.7 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
505 greg 2.1 int rlen = strlen(rbuf);
506     if (strncmp(rbuf, hdr+begData, rlen)) {
507     sprintf(errmsg, "bad resolution string in '%s'", GetName());
508     error(USER, errmsg);
509     return -1;
510     }
511     begData += rlen;
512     }
513     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
514     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
515     }
516    
517     // Rewind calculation (previous results unchanged)
518     bool
519     RcontribSimulManager::ResetRow(int r)
520     {
521     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
522     error(WARNING, "ignoring bad call to ResetRow()");
523     return (r == rInPos);
524     }
525     FlushQueue(); // finish current and reset
526     for (RcontribOutput *op = outList; op; op = op->next)
527     if (!op->SetRowsDone(r))
528     return false;
529    
530     rowsDone.ClearBits(r, rInPos-r, false);
531     rInPos = r;
532     return true;
533     }
534    
535     // call-back for averaging each modifier's contributions to assigned channel(s)
536     static int
537     putModContrib(const LUENT *lp, void *p)
538     {
539     RcontribMod * mp = (RcontribMod *)lp->data;
540     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
541     const double sca = 1./rcp->accum;
542     const DCOLORV * dvp = mp->cbin;
543     RcontribOutput * op = mp->opl;
544     int i, n;
545    
546     switch (rcp->GetFormat()) { // conversion based on output type
547     case 'd': {
548     double * dvo = (double *)op->InsertionP(mp->coffset);
549     for (n = mp->nbins; n--; ) {
550     for (i = 0; i < NCSAMP; i++)
551     *dvo++ = *dvp++ * sca;
552     if ((op->obin >= 0) & (n > 0))
553     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
554     }
555     } break;
556     case 'f': {
557     float * fvo = (float *)op->InsertionP(mp->coffset);
558     for (n = mp->nbins; n--; ) {
559     for (i = 0; i < NCSAMP; i++)
560     *fvo++ = float(*dvp++ * sca);
561     if ((op->obin >= 0) & (n > 0))
562     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
563     }
564     } break;
565     case 'c': {
566     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
567     for (n = mp->nbins; n--; ) {
568     SCOLOR scol;
569     for (i = 0; i < NCSAMP; i++)
570     scol[i] = COLORV(*dvp++ * sca);
571     scolor_scolr(cvo, scol);
572     cvo += LSCOLR;
573     if ((op->obin >= 0) & (n > 0))
574     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
575     }
576     } break;
577     default:
578     error(CONSISTENCY, "unsupported output type in sendModContrib()");
579     return -1;
580     }
581     // clear for next tally
582     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
583     return 1;
584     }
585    
586     // Add a ray/bundle to compute next record
587     int
588     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
589     {
590     if (!Ready())
591     return 0;
592     if (rInPos >= outList->nRows) {
593     error(WARNING, "ComputeRecord() called after last record");
594     return 0;
595     }
596     if (nkids > 0) { // in parent process?
597     int k = GetChild(); // updates output rows
598     if (k < 0) return -1; // can't really happen
599     RowAssignment rass;
600     rass.row = kidRow[k] = rInPos++;
601     rass.ac = accum;
602     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
603     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
604     != sizeof(FVECT)*2*accum) {
605     error(SYSTEM, "cannot write to child; dead process?");
606     return -1;
607     }
608     return accum; // tracing/output happens in child
609     }
610     if (NCSAMP != nChan) {
611     error(INTERNAL, "number of color channels must remain fixed");
612     return -1;
613     }
614     // actual work is done here...
615     if (EnqueueBundle(orig_direc, accum) < 0)
616     return -1;
617    
618     RcontribOutput * op; // prepare output buffers
619     for (op = outList; op; op = op->next)
620     if (!op->GetRow(rInPos))
621     return -1;
622     // convert averages & clear
623     if (lu_doall(&modLUT, putModContrib, this) < 0)
624     return -1;
625     // write buffers
626     for (op = outList; op; op = op->next)
627     op->DoneRow();
628    
629     if (!nkids) // update row if solo process
630     UpdateRowsDone(rInPos++); // XXX increment here is critical
631    
632     return accum;
633     }
634    
635     // Get next available child, returning index or -1 if forceWait & idling
636     int
637     RcontribSimulManager::GetChild(bool forceWait)
638     {
639     if (nkids <= 0)
640     return -1;
641     // take inventory
642     int pn, n = 0;
643     fd_set writeset, errset;
644     FD_ZERO(&writeset); FD_ZERO(&errset);
645     for (pn = nkids; pn--; ) {
646     if (kidRow[pn] < 0) { // child already ready?
647     if (forceWait) continue;
648     return pn; // good enough
649     }
650     FD_SET(kid[pn].w, &writeset); // will check on this one
651     FD_SET(kid[pn].w, &errset);
652     if (kid[pn].w >= n)
653     n = kid[pn].w + 1;
654     }
655     if (!n) // every child is idle?
656     return -1;
657     // wait on "busy" child(ren)
658     while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
659     if (errno != EINTR) {
660     error(SYSTEM, "select call failed in GetChild()");
661     return -1;
662     }
663     pn = -1; // get flags set by select
664     for (n = nkids; n--; )
665     if (kidRow[n] >= 0 &&
666     FD_ISSET(kid[n].w, &writeset) |
667     FD_ISSET(kid[n].w, &errset)) {
668     // update output row counts
669 greg 2.3 if (!FD_ISSET(kid[n].w, &errset))
670     UpdateRowsDone(kidRow[n]);
671 greg 2.1 kidRow[n] = -1; // flag it available
672     pn = n;
673     }
674     return pn;
675     }
676    
677     // Update row completion bitmap and update outputs (does not change rInPos)
678     bool
679     RcontribSimulManager::UpdateRowsDone(int r)
680     {
681     if (!rowsDone.TestAndSet(r)) {
682     error(WARNING, "redundant call to UpdateRowsDone()");
683     return false;
684     }
685     int nDone = GetRowFinished();
686     if (nDone <= r)
687     return true; // nothing to update, yet
688     for (RcontribOutput *op = outList; op; op = op->next)
689     if (!op->SetRowsDone(nDone))
690     return false;
691     return true; // up-to-date
692     }
693    
694     // Run rcontrib child process (never returns)
695     void
696     RcontribSimulManager::RunChild()
697     {
698     FVECT * vecList = NULL;
699     RowAssignment rass;
700     ssize_t nr;
701    
702     accum = 0;
703     errno = 0;
704     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
705     if (!rass.ac) {
706     error(CONSISTENCY, "bad accumulator count in child");
707     exit(1);
708     }
709     if (rass.ac > accum)
710     vecList = (FVECT *)erealloc(vecList,
711     sizeof(FVECT)*2*rass.ac);
712     accum = rass.ac;
713     rInPos = rass.row;
714    
715     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
716     sizeof(FVECT)*2*accum)
717     break;
718    
719     if (ComputeRecord(vecList) <= 0)
720     exit(1);
721     }
722     if (nr) {
723     error(SYSTEM, "read error in child process");
724     exit(1);
725     }
726     exit(0);
727     }
728    
729     // Add child processes as indicated
730     bool
731     RcontribSimulManager::StartKids(int n2go)
732     {
733     if ((n2go <= 0) | (nkids + n2go <= 1))
734     return false;
735    
736     if (!nkids)
737     cow_memshare(); // preload objects
738    
739     n2go += nkids; // => desired brood size
740     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
741     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
742    
743     fflush(stdout); // shouldn't use, anyway
744     while (nkids < n2go) {
745     kid[nkids] = sp_inactive;
746     kid[nkids].w = dup(1);
747     kid[nkids].flags |= PF_FILT_OUT;
748     int rv = open_process(&kid[nkids], NULL);
749     if (!rv) { // in child process?
750     while (nkids-- > 0)
751     close(kid[nkids].w);
752     free(kid); free(kidRow);
753     kid = NULL; kidRow = NULL;
754     RunChild(); // should never return
755     _exit(1);
756     }
757     if (rv < 0) {
758     error(SYSTEM, "cannot fork worker process");
759     return false;
760     }
761     kidRow[nkids++] = -1; // newborn is ready
762     }
763     return true;
764     }
765    
766     // Reap the indicated number of children (all if 0)
767     int
768     RcontribSimulManager::StopKids(int n2end)
769     {
770     if (nkids <= 0)
771     return 0;
772     FlushQueue();
773     int status = 0;
774     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
775     status = close_processes(kid, nkids);
776     free(kid); free(kidRow);
777     kid = NULL; kidRow = NULL;
778     nkids = 0;
779     } else { // else shrink family
780     int st;
781     while (n2end-- > 0)
782     if ((st = close_process(&kid[--nkids])) > 0)
783     status = st;
784     // could call realloc(), but it hardly matters
785     }
786     if (status) {
787     sprintf(errmsg, "non-zero (%d) status from child", status);
788     error(WARNING, errmsg);
789     }
790     return status;
791     }
792    
793     // Set number of computation threads (0 => #cores)
794     int
795     RcontribSimulManager::SetThreadCount(int nt)
796     {
797     if (!Ready()) {
798     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
799     return 0;
800     }
801     if (nt < 0)
802 greg 2.6 return NThreads();
803 greg 2.1 if (!nt) nt = GetNCores();
804     int status = 0;
805     if (nt == 1)
806     status = StopKids();
807     else if (nt < nkids)
808     status = StopKids(nkids-nt);
809     else if (nt > nkids)
810     StartKids(nt-nkids);
811     if (status) {
812     sprintf(errmsg, "non-zero (%d) status from child", status);
813     error(WARNING, errmsg);
814     }
815 greg 2.6 return NThreads();
816 greg 2.1 }