ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.9
Committed: Tue Dec 3 17:39:42 2024 UTC (5 months, 2 weeks ago) by greg
Branch: MAIN
Changes since 2.8: +14 -1 lines
Log Message:
refactor(rxcontrib): Moved definition of formstr() to make it more accessible

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.9 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.8 2024/11/07 18:07:43 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27     // new/exclusive, overwrite if exists, or recover data
28     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29     RDSread|RDSwrite};
30    
31     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32     #define LNROWSTR 6
33    
34     // Modifier channel for recording contributions (no constructor/destructor)
35     struct RcontribMod {
36     RcontribOutput * opl; // pointer to first output channel
37     char * params; // parameters string
38 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
39 greg 2.1 int nbins; // bin count this modifier
40     int coffset; // column offset in bytes
41     DCOLORV cbin[1]; // bin accumulator (extends struct)
42     /// Access specific (spectral) color bin
43     DCOLORV * operator[](int n) {
44     if ((n < 0) | (n >= nbins)) return NULL;
45     return cbin + n*NCSAMP;
46     }
47     };
48    
49 greg 2.2 // Struct used to assign record calculation to child
50 greg 2.1 struct RowAssignment {
51     uint32 row; // row to do
52     uint32 ac; // accumulation count
53     };
54    
55 greg 2.9 // Get format identifier
56     const char *
57     formstr(int f)
58     {
59     switch (f) {
60     case 'a': return("ascii");
61     case 'f': return("float");
62     case 'd': return("double");
63     case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
64     }
65     return("unknown");
66     }
67    
68 greg 2.1 // Our default data share function
69     RdataShare *
70     defDataShare(const char *name, RCOutputOp op, size_t siz)
71     {
72     return new RdataShareMap(name, RSDOflags[op], siz);
73     }
74    
75     // Allocate rcontrib accumulator
76     RcontribMod *
77     NewRcMod(const char *prms, const char *binexpr, int ncbins)
78     {
79     if (!prms) prms = "";
80 greg 2.4 if (!binexpr & (ncbins > 1)) {
81 greg 2.8 error(USER, "missing bin expression");
82 greg 2.4 return NULL;
83     }
84 greg 2.8 if (ncbins <= 1) { // shouldn't have bin expression?
85 greg 2.4 if (binexpr && strcmp(binexpr, "0"))
86     error(WARNING, "ignoring non-zero expression for single bin");
87     prms = "";
88     binexpr = NULL;
89 greg 2.8 ncbins = 1;
90 greg 2.4 }
91 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
92     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
93     strlen(prms)+1);
94    
95     mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
96 greg 2.4 if (binexpr) {
97     mp->binv = eparse(const_cast<char *>(binexpr));
98     CHECK(mp->binv->type==NUM, WARNING, "constant bin expression");
99     }
100 greg 2.1 mp->nbins = ncbins;
101     return mp;
102     }
103    
104     // Free an RcontribMod
105     void
106     FreeRcMod(void *p)
107     {
108     if (!p) return;
109 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
110     if (bep) epfree(bep, true);
111 greg 2.1 efree(p);
112     }
113    
114     // Set output format ('f', 'd', or 'c')
115     bool
116     RcontribSimulManager::SetDataFormat(int ty)
117     {
118     if (outList) {
119     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
120     return false;
121     }
122     switch (ty) {
123     case 'f':
124     dsiz = sizeof(float)*NCSAMP;
125     break;
126     case 'd':
127     dsiz = sizeof(double)*NCSAMP;
128     break;
129     case 'c':
130     dsiz = LSCOLR;
131     break;
132     default:
133     sprintf(errmsg, "unsupported output format '%c'", ty);
134     error(INTERNAL, errmsg);
135     return false;
136     }
137     dtyp = ty;
138     return true;
139     }
140    
141     // static call-back for rcontrib ray-tracing
142     int
143     RcontribSimulManager::RctCall(RAY *r, void *cd)
144     {
145     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
146     return 0;
147     // shadow ray not on source?
148     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
149     return 0;
150    
151     const char * mname = objptr(r->ro->omod)->oname;
152     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
153     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
154     if (!mp)
155     return 0; // not in our modifier list
156    
157 greg 2.4 int bi = 0; // get bin index
158     if (mp->binv) {
159 greg 2.5 worldfunc(RCCONTEXT, r);
160 greg 2.4 set_eparams(mp->params);
161     double bval = evalue(mp->binv);
162     if (bval <= -.5)
163     return 0; // silently ignore negative bin index
164     bi = int(bval + .5);
165     }
166     DCOLORV * dvp = (*mp)[bi];
167 greg 2.1 if (!dvp) {
168 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
169 greg 2.1 error(WARNING, errmsg);
170     return 0;
171     }
172     SCOLOR contr;
173     raycontrib(contr, r, PRIMARY); // compute coefficient
174 greg 2.7 if (rcp->HasFlag(RCcontrib))
175 greg 2.1 smultscolor(contr, r->rcol); // -> value contribution
176     for (int i = 0; i < NCSAMP; i++)
177     *dvp++ += contr[i]; // accumulate color/spectrum
178     return 1;
179     }
180    
181     // check for given format type in string, return last char position
182     static int
183     hasFormat(const char *tst, const char *match)
184     {
185     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
186     const char * s = tst;
187     while (*s) {
188     if (*s++ != '%')
189     continue;
190     while (!strchr(allPoss, *s))
191     s++;
192     if (strchr(match, *s))
193     break;
194     s++;
195     }
196     return (*s != '\0')*(s - tst);
197     }
198    
199     // Add a modifier and arguments, opening output file(s)
200     bool
201     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
202     const char *prms, const char *binval, int bincnt)
203     {
204 greg 2.8 if (!modn | !outspec || !*modn | !*outspec) {
205 greg 2.1 error(WARNING, "ignoring bad call to AddModifier()");
206     return false;
207     }
208     if (!nChan) { // initial call?
209 greg 2.7 if ((xres < 0) | (yres <= 0)) {
210 greg 2.8 error(USER, "xres, yres must be set before first modifier");
211 greg 2.7 return false;
212     }
213 greg 2.1 if (!SetDataFormat(dtyp))
214     return false;
215     nChan = NCSAMP;
216     } else if (nChan != NCSAMP) {
217 greg 2.8 error(USER, "number of spectral channels must be fixed");
218 greg 2.1 return false;
219     }
220     if (Ready()) {
221     error(INTERNAL, "call to AddModifier() after PrepOutput()");
222     return false;
223     }
224     LUENT * lp = lu_find(&modLUT, modn);
225     if (lp->data) {
226     sprintf(errmsg, "duplicate modifier '%s'", modn);
227     error(USER, errmsg);
228     return false;
229     }
230     if (!lp->key) // create new entry
231     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
232    
233     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
234     if (!mp) return false;
235     lp->data = (char *)mp;
236     const int modndx = hasFormat(outspec, "sb");
237     const int binndx = hasFormat(outspec, "diouxX");
238     int bin0 = 0;
239     char fnbuf[512];
240 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
241 greg 2.1 sprintf(fnbuf, outspec, bin0, modn);
242     else
243     sprintf(fnbuf, outspec, modn, bin0);
244     RcontribOutput * olast = NULL;
245     RcontribOutput * op;
246     for (op = outList; op; op = (olast=op)->next) {
247     if (strcmp(op->GetName(), fnbuf))
248     continue;
249     if (modndx) { // this ain't right
250     sprintf(errmsg, "output name collision for '%s'", fnbuf);
251     error(USER, errmsg);
252     return false;
253     }
254     if ((binndx > 0) & (xres > 0)) {
255     sprintf(fnbuf, outspec, ++bin0);
256     continue; // each bin goes to another image
257     }
258     mp->coffset = op->rowBytes; // else add to what's there
259     break;
260     }
261     if (!op) { // create new output channel?
262     op = new RcontribOutput(fnbuf);
263     if (olast) olast->next = op;
264     else outList = op;
265     }
266     mp->opl = op; // first (maybe only) output channel
267     if (modndx) // remember modifier if part of name
268     op->omod = lp->key;
269 greg 2.8 if (binndx) { // append output image/bin list
270 greg 2.1 op->rowBytes += dsiz;
271     op->obin = bin0;
272     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
273 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
274 greg 2.1 sprintf(fnbuf, outspec, bin0+bincnt, modn);
275     else
276     sprintf(fnbuf, outspec, modn, bin0+bincnt);
277     if (!op->next) {
278     olast = op;
279     olast->next = op = new RcontribOutput(fnbuf);
280     if (modndx) op->omod = lp->key;
281     op->obin = bin0+bincnt;
282     } else {
283     op = op->next;
284     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
285     "bin number mismatch in AddModifier()");
286     }
287     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
288     "row offset mismatch in AddModifier()");
289     op->rowBytes += dsiz;
290     }
291     } else // else send all results to this channel
292 greg 2.8 op->rowBytes += mp->nbins*dsiz;
293 greg 2.1 return true;
294     }
295    
296     // Add a file of modifiers with associated arguments
297     bool
298     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
299     const char *prms,
300     const char *binval, int bincnt)
301     {
302     char * path = getpath(const_cast<char *>(modfn),
303     getrlibpath(), R_OK);
304     FILE * fp;
305     if (!path || !(fp = fopen(path, "r"))) {
306     sprintf(errmsg, "cannot %s modifier file '%s'",
307     path ? "open" : "find", modfn);
308     error(SYSTEM, errmsg);
309     return false;
310     }
311     char mod[MAXSTR];
312     while (fgetword(mod, sizeof(mod), fp))
313     if (!AddModifier(mod, outspec, prms, binval, bincnt))
314     return false;
315     fclose(fp);
316     return true;
317     }
318    
319 greg 2.4 // call-back to check if modifier has been loaded
320     static int
321     checkModExists(const LUENT *lp, void *p)
322     {
323     if (modifier(lp->key) != OVOID)
324     return 1;
325    
326     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
327     error(WARNING, errmsg);
328     return 0;
329     }
330    
331 greg 2.1 // Prepare output channels and return # completed rows
332     int
333     RcontribSimulManager::PrepOutput()
334     {
335     if (!outList || !RtraceSimulManager::Ready()) {
336     error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
337     return -1;
338     }
339 greg 2.5 if (!cdsF) {
340     error(INTERNAL, "missing RdataShare constructor call (*cdsF)");
341     return -1;
342     }
343 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
344     return -1;
345    
346 greg 2.1 int remWarnings = 20;
347     for (RcontribOutput *op = outList; op; op = op->next) {
348     if (op->rData) {
349     error(INTERNAL, "output channel already open in PrepOutput()");
350     return -1;
351     }
352     op->nRows = yres * (xres + !xres);
353     op->rData = (*cdsF)(op->ofname, outOp,
354     GetHeadLen()+1024 + op->nRows*op->rowBytes);
355     freeqstr(op->ofname); op->ofname = NULL;
356     if (outOp == RCOrecover) {
357     int rd = op->CheckHeader(this);
358     if (rd < 0)
359     return -1;
360     if (rd >= op->nRows) {
361     if (remWarnings >= 0) {
362     sprintf(errmsg, "recovered output '%s' already done",
363     op->GetName());
364     error(WARNING, remWarnings ? errmsg : "etc...");
365     remWarnings--;
366     }
367     rd = op->nRows;
368     }
369     if (!rInPos | (rInPos > rd)) rInPos = rd;
370     } else if (!op->NewHeader(this))
371     return -1;
372     // make sure there's room
373     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
374     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
375     return -1; // calls error() for us
376     }
377     rowsDone.NewBitMap(outList->nRows); // create row completion map
378     rowsDone.ClearBits(0, rInPos, true);
379     return rInPos;
380     }
381    
382     // Create header in open write-only channel
383     bool
384     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
385     {
386     int headlim = rcp->GetHeadLen() + 1024;
387     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
388     int esiz;
389     const int etyp = rcp->GetFormat(&esiz);
390    
391     strcpy(hdr, HDRSTR);
392     strcat(hdr, "RADIANCE\n");
393     begData = strlen(hdr);
394     strcpy(hdr+begData, rcp->GetHeadStr());
395     begData += rcp->GetHeadLen();
396     if (omod) {
397     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
398     begData += strlen(hdr+begData);
399     }
400     if (obin >= 0) {
401     sprintf(hdr+begData, "BIN=%d\n", obin);
402     begData += strlen(hdr+begData);
403     }
404     strcpy(hdr+begData, ROWZEROSTR);
405     rowCountPos = begData+LNROWSTR;
406     begData += sizeof(ROWZEROSTR)-1;
407 greg 2.7 if (!rcp->xres | (rowBytes > esiz)) {
408 greg 2.1 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
409     begData += strlen(hdr+begData);
410     }
411     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
412     begData += strlen(hdr+begData);
413     if (NCSAMP > 3) {
414     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
415     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
416     begData += strlen(hdr+begData);
417     }
418     if (etyp != 'c') {
419     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
420     begData += strlen(hdr+begData);
421     }
422     int align = 0;
423     switch (etyp) {
424     case 'f':
425     align = sizeof(float);
426     break;
427     case 'd':
428     align = sizeof(double);
429     break;
430     case 'c':
431     break;
432     default:
433     error(INTERNAL, "unsupported data type in NewHeader()");
434     return false;
435     }
436     strcpy(hdr+begData, FMTSTR); // write format string
437     begData += strlen(hdr+begData);
438     strcpy(hdr+begData, formstr(etyp));
439     begData += strlen(hdr+begData);
440     if (align) // align data at end of header
441     while ((begData+2) % align)
442     hdr[begData++] = ' ';
443     hdr[begData++] = '\n'; // EOL for data format
444     hdr[begData++] = '\n'; // end of nominal header
445 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
446     sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
447 greg 2.1 begData += strlen(hdr+begData);
448     }
449     return rData->ReleaseMemory(hdr, RDSwrite);
450     }
451    
452     // find string argument in header for named variable
453     static const char *
454     findArgs(const char *hdr, const char *vnm, int len)
455     {
456     const char * npos = strnstr(hdr, vnm, len);
457    
458     if (!npos) return NULL;
459    
460     npos += strlen(vnm); // find start of (first) argument
461     len -= npos - hdr;
462     while (len-- > 0 && (*npos == '=') | isspace(*npos))
463     if (*npos++ == '\n')
464     return NULL;
465    
466     return (len >= 0) ? npos : NULL;
467     }
468    
469     // Load and check header in read/write channel
470     int
471     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
472     {
473     int esiz;
474     const int etyp = rcp->GetFormat(&esiz);
475     const int maxlen = rcp->GetHeadLen() + 1024;
476     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
477     const char * cp;
478     // find end of header
479     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
480     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
481     error(USER, errmsg);
482     return -1;
483     }
484     begData = cp - hdr + 1; // increment again at end
485     // check # components
486     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
487     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
488     error(USER, errmsg);
489     return -1;
490     }
491     // check format
492 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
493     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
494 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
495     error(USER, errmsg);
496     return -1;
497     }
498     // check #columns
499     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
500 greg 2.7 !rcp->xres | (rowBytes > esiz)) {
501 greg 2.1 sprintf(errmsg, "expected NCOLS=%d in '%s'",
502     int(rowBytes/esiz), GetName());
503     error(USER, errmsg);
504     return -1;
505     }
506     // find row count
507     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
508     sprintf(errmsg, "missing NROWS in '%s'", GetName());
509     error(USER, errmsg);
510     return -1;
511     }
512     rowCountPos = cp - hdr;
513     int rlast = atoi(cp);
514     begData++; // advance past closing EOL
515 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
516 greg 2.1 char rbuf[64];
517 greg 2.7 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
518 greg 2.1 int rlen = strlen(rbuf);
519     if (strncmp(rbuf, hdr+begData, rlen)) {
520     sprintf(errmsg, "bad resolution string in '%s'", GetName());
521     error(USER, errmsg);
522     return -1;
523     }
524     begData += rlen;
525     }
526     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
527     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
528     }
529    
530     // Rewind calculation (previous results unchanged)
531     bool
532     RcontribSimulManager::ResetRow(int r)
533     {
534     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
535     error(WARNING, "ignoring bad call to ResetRow()");
536     return (r == rInPos);
537     }
538     FlushQueue(); // finish current and reset
539     for (RcontribOutput *op = outList; op; op = op->next)
540     if (!op->SetRowsDone(r))
541     return false;
542    
543     rowsDone.ClearBits(r, rInPos-r, false);
544     rInPos = r;
545     return true;
546     }
547    
548     // call-back for averaging each modifier's contributions to assigned channel(s)
549     static int
550     putModContrib(const LUENT *lp, void *p)
551     {
552     RcontribMod * mp = (RcontribMod *)lp->data;
553     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
554     const double sca = 1./rcp->accum;
555     const DCOLORV * dvp = mp->cbin;
556     RcontribOutput * op = mp->opl;
557     int i, n;
558    
559     switch (rcp->GetFormat()) { // conversion based on output type
560     case 'd': {
561     double * dvo = (double *)op->InsertionP(mp->coffset);
562     for (n = mp->nbins; n--; ) {
563     for (i = 0; i < NCSAMP; i++)
564     *dvo++ = *dvp++ * sca;
565     if ((op->obin >= 0) & (n > 0))
566     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
567     }
568     } break;
569     case 'f': {
570     float * fvo = (float *)op->InsertionP(mp->coffset);
571     for (n = mp->nbins; n--; ) {
572     for (i = 0; i < NCSAMP; i++)
573     *fvo++ = float(*dvp++ * sca);
574     if ((op->obin >= 0) & (n > 0))
575     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
576     }
577     } break;
578     case 'c': {
579     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
580     for (n = mp->nbins; n--; ) {
581     SCOLOR scol;
582     for (i = 0; i < NCSAMP; i++)
583     scol[i] = COLORV(*dvp++ * sca);
584     scolor_scolr(cvo, scol);
585     cvo += LSCOLR;
586     if ((op->obin >= 0) & (n > 0))
587     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
588     }
589     } break;
590     default:
591     error(CONSISTENCY, "unsupported output type in sendModContrib()");
592     return -1;
593     }
594     // clear for next tally
595     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
596     return 1;
597     }
598    
599     // Add a ray/bundle to compute next record
600     int
601     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
602     {
603     if (!Ready())
604     return 0;
605     if (rInPos >= outList->nRows) {
606     error(WARNING, "ComputeRecord() called after last record");
607     return 0;
608     }
609     if (nkids > 0) { // in parent process?
610     int k = GetChild(); // updates output rows
611     if (k < 0) return -1; // can't really happen
612     RowAssignment rass;
613     rass.row = kidRow[k] = rInPos++;
614     rass.ac = accum;
615     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
616     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
617     != sizeof(FVECT)*2*accum) {
618     error(SYSTEM, "cannot write to child; dead process?");
619     return -1;
620     }
621     return accum; // tracing/output happens in child
622     }
623     if (NCSAMP != nChan) {
624     error(INTERNAL, "number of color channels must remain fixed");
625     return -1;
626     }
627     // actual work is done here...
628     if (EnqueueBundle(orig_direc, accum) < 0)
629     return -1;
630    
631     RcontribOutput * op; // prepare output buffers
632     for (op = outList; op; op = op->next)
633     if (!op->GetRow(rInPos))
634     return -1;
635     // convert averages & clear
636     if (lu_doall(&modLUT, putModContrib, this) < 0)
637     return -1;
638     // write buffers
639     for (op = outList; op; op = op->next)
640     op->DoneRow();
641    
642     if (!nkids) // update row if solo process
643     UpdateRowsDone(rInPos++); // XXX increment here is critical
644    
645     return accum;
646     }
647    
648     // Get next available child, returning index or -1 if forceWait & idling
649     int
650     RcontribSimulManager::GetChild(bool forceWait)
651     {
652     if (nkids <= 0)
653     return -1;
654     // take inventory
655     int pn, n = 0;
656     fd_set writeset, errset;
657     FD_ZERO(&writeset); FD_ZERO(&errset);
658     for (pn = nkids; pn--; ) {
659     if (kidRow[pn] < 0) { // child already ready?
660     if (forceWait) continue;
661     return pn; // good enough
662     }
663     FD_SET(kid[pn].w, &writeset); // will check on this one
664     FD_SET(kid[pn].w, &errset);
665     if (kid[pn].w >= n)
666     n = kid[pn].w + 1;
667     }
668     if (!n) // every child is idle?
669     return -1;
670     // wait on "busy" child(ren)
671     while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
672     if (errno != EINTR) {
673     error(SYSTEM, "select call failed in GetChild()");
674     return -1;
675     }
676     pn = -1; // get flags set by select
677     for (n = nkids; n--; )
678     if (kidRow[n] >= 0 &&
679     FD_ISSET(kid[n].w, &writeset) |
680     FD_ISSET(kid[n].w, &errset)) {
681     // update output row counts
682 greg 2.3 if (!FD_ISSET(kid[n].w, &errset))
683     UpdateRowsDone(kidRow[n]);
684 greg 2.1 kidRow[n] = -1; // flag it available
685     pn = n;
686     }
687     return pn;
688     }
689    
690     // Update row completion bitmap and update outputs (does not change rInPos)
691     bool
692     RcontribSimulManager::UpdateRowsDone(int r)
693     {
694     if (!rowsDone.TestAndSet(r)) {
695     error(WARNING, "redundant call to UpdateRowsDone()");
696     return false;
697     }
698     int nDone = GetRowFinished();
699     if (nDone <= r)
700     return true; // nothing to update, yet
701     for (RcontribOutput *op = outList; op; op = op->next)
702     if (!op->SetRowsDone(nDone))
703     return false;
704     return true; // up-to-date
705     }
706    
707     // Run rcontrib child process (never returns)
708     void
709     RcontribSimulManager::RunChild()
710     {
711     FVECT * vecList = NULL;
712     RowAssignment rass;
713     ssize_t nr;
714    
715     accum = 0;
716     errno = 0;
717     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
718     if (!rass.ac) {
719     error(CONSISTENCY, "bad accumulator count in child");
720     exit(1);
721     }
722     if (rass.ac > accum)
723     vecList = (FVECT *)erealloc(vecList,
724     sizeof(FVECT)*2*rass.ac);
725     accum = rass.ac;
726     rInPos = rass.row;
727    
728     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
729     sizeof(FVECT)*2*accum)
730     break;
731    
732     if (ComputeRecord(vecList) <= 0)
733     exit(1);
734     }
735     if (nr) {
736     error(SYSTEM, "read error in child process");
737     exit(1);
738     }
739     exit(0);
740     }
741    
742     // Add child processes as indicated
743     bool
744     RcontribSimulManager::StartKids(int n2go)
745     {
746     if ((n2go <= 0) | (nkids + n2go <= 1))
747     return false;
748    
749     if (!nkids)
750     cow_memshare(); // preload objects
751    
752     n2go += nkids; // => desired brood size
753     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
754     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
755    
756     fflush(stdout); // shouldn't use, anyway
757     while (nkids < n2go) {
758     kid[nkids] = sp_inactive;
759     kid[nkids].w = dup(1);
760     kid[nkids].flags |= PF_FILT_OUT;
761     int rv = open_process(&kid[nkids], NULL);
762     if (!rv) { // in child process?
763     while (nkids-- > 0)
764     close(kid[nkids].w);
765     free(kid); free(kidRow);
766     kid = NULL; kidRow = NULL;
767     RunChild(); // should never return
768     _exit(1);
769     }
770     if (rv < 0) {
771     error(SYSTEM, "cannot fork worker process");
772     return false;
773     }
774     kidRow[nkids++] = -1; // newborn is ready
775     }
776     return true;
777     }
778    
779     // Reap the indicated number of children (all if 0)
780     int
781     RcontribSimulManager::StopKids(int n2end)
782     {
783     if (nkids <= 0)
784     return 0;
785     FlushQueue();
786     int status = 0;
787     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
788     status = close_processes(kid, nkids);
789     free(kid); free(kidRow);
790     kid = NULL; kidRow = NULL;
791     nkids = 0;
792     } else { // else shrink family
793     int st;
794     while (n2end-- > 0)
795     if ((st = close_process(&kid[--nkids])) > 0)
796     status = st;
797     // could call realloc(), but it hardly matters
798     }
799     if (status) {
800     sprintf(errmsg, "non-zero (%d) status from child", status);
801     error(WARNING, errmsg);
802     }
803     return status;
804     }
805    
806     // Set number of computation threads (0 => #cores)
807     int
808     RcontribSimulManager::SetThreadCount(int nt)
809     {
810     if (!Ready()) {
811     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
812     return 0;
813     }
814     if (nt < 0)
815 greg 2.6 return NThreads();
816 greg 2.1 if (!nt) nt = GetNCores();
817     int status = 0;
818     if (nt == 1)
819     status = StopKids();
820     else if (nt < nkids)
821     status = StopKids(nkids-nt);
822     else if (nt > nkids)
823     StartKids(nt-nkids);
824     if (status) {
825     sprintf(errmsg, "non-zero (%d) status from child", status);
826     error(WARNING, errmsg);
827     }
828 greg 2.6 return NThreads();
829 greg 2.1 }