ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.14
Committed: Fri Oct 17 01:15:53 2025 UTC (2 weeks, 6 days ago) by greg
Branch: MAIN
Changes since 2.13: +28 -18 lines
Log Message:
perf(rxcontrib): Added handshake to ensure each row is complete in children

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.14 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.13 2025/10/16 23:14:31 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27     // new/exclusive, overwrite if exists, or recover data
28     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29     RDSread|RDSwrite};
30    
31     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32     #define LNROWSTR 6
33    
34     // Modifier channel for recording contributions (no constructor/destructor)
35     struct RcontribMod {
36     RcontribOutput * opl; // pointer to first output channel
37     char * params; // parameters string
38 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
39 greg 2.1 int nbins; // bin count this modifier
40     int coffset; // column offset in bytes
41     DCOLORV cbin[1]; // bin accumulator (extends struct)
42     /// Access specific (spectral) color bin
43     DCOLORV * operator[](int n) {
44     if ((n < 0) | (n >= nbins)) return NULL;
45     return cbin + n*NCSAMP;
46     }
47     };
48    
49 greg 2.2 // Struct used to assign record calculation to child
50 greg 2.1 struct RowAssignment {
51     uint32 row; // row to do
52     uint32 ac; // accumulation count
53     };
54    
55 greg 2.14 static const char ROW_DONE[] = "ROW FINISHED\n";
56    
57 greg 2.9 // Get format identifier
58     const char *
59     formstr(int f)
60     {
61     switch (f) {
62     case 'a': return("ascii");
63     case 'f': return("float");
64     case 'd': return("double");
65     case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
66     }
67     return("unknown");
68     }
69    
70 greg 2.1 // Our default data share function
71     RdataShare *
72     defDataShare(const char *name, RCOutputOp op, size_t siz)
73     {
74     return new RdataShareMap(name, RSDOflags[op], siz);
75     }
76    
77     // Allocate rcontrib accumulator
78     RcontribMod *
79     NewRcMod(const char *prms, const char *binexpr, int ncbins)
80     {
81     if (!prms) prms = "";
82 greg 2.10 if ((ncbins > 1) & !binexpr) {
83 greg 2.8 error(USER, "missing bin expression");
84 greg 2.4 return NULL;
85     }
86 greg 2.10 if (ncbins < 1) ncbins = 1;
87    
88 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
89     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
90     strlen(prms)+1);
91    
92 greg 2.11 if (binexpr) { // get/check bin expression
93 greg 2.4 mp->binv = eparse(const_cast<char *>(binexpr));
94 greg 2.10 if (mp->binv->type == NUM) { // constant expression (0)?
95 greg 2.11 if ((int)evalue(mp->binv) != 0) {
96     sprintf(errmsg, "illegal non-zero constant for bin (%s)",
97 greg 2.10 binexpr);
98     error(USER, errmsg);
99     }
100     if (ncbins > 1) {
101     sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
102     error(USER, errmsg);
103     }
104     epfree(mp->binv, true);
105     mp->binv = NULL;
106     prms = "";
107     ncbins = 1;
108     }
109 greg 2.4 }
110 greg 2.10 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
111 greg 2.1 mp->nbins = ncbins;
112     return mp;
113     }
114    
115     // Free an RcontribMod
116     void
117     FreeRcMod(void *p)
118     {
119     if (!p) return;
120 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
121     if (bep) epfree(bep, true);
122 greg 2.1 efree(p);
123     }
124    
125     // Set output format ('f', 'd', or 'c')
126     bool
127     RcontribSimulManager::SetDataFormat(int ty)
128     {
129     if (outList) {
130     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
131     return false;
132     }
133     switch (ty) {
134     case 'f':
135     dsiz = sizeof(float)*NCSAMP;
136     break;
137     case 'd':
138     dsiz = sizeof(double)*NCSAMP;
139     break;
140     case 'c':
141     dsiz = LSCOLR;
142     break;
143     default:
144     sprintf(errmsg, "unsupported output format '%c'", ty);
145     error(INTERNAL, errmsg);
146     return false;
147     }
148     dtyp = ty;
149     return true;
150     }
151    
152     // static call-back for rcontrib ray-tracing
153     int
154     RcontribSimulManager::RctCall(RAY *r, void *cd)
155     {
156     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
157     return 0;
158     // shadow ray not on source?
159     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
160     return 0;
161    
162     const char * mname = objptr(r->ro->omod)->oname;
163     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
164     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
165     if (!mp)
166     return 0; // not in our modifier list
167    
168 greg 2.4 int bi = 0; // get bin index
169     if (mp->binv) {
170 greg 2.5 worldfunc(RCCONTEXT, r);
171 greg 2.4 set_eparams(mp->params);
172     double bval = evalue(mp->binv);
173     if (bval <= -.5)
174     return 0; // silently ignore negative bin index
175     bi = int(bval + .5);
176     }
177     DCOLORV * dvp = (*mp)[bi];
178 greg 2.1 if (!dvp) {
179 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
180 greg 2.1 error(WARNING, errmsg);
181     return 0;
182     }
183     SCOLOR contr;
184     raycontrib(contr, r, PRIMARY); // compute coefficient
185 greg 2.7 if (rcp->HasFlag(RCcontrib))
186 greg 2.1 smultscolor(contr, r->rcol); // -> value contribution
187 greg 2.11
188 greg 2.1 for (int i = 0; i < NCSAMP; i++)
189     *dvp++ += contr[i]; // accumulate color/spectrum
190     return 1;
191     }
192    
193     // check for given format type in string, return last char position
194     static int
195     hasFormat(const char *tst, const char *match)
196     {
197     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
198     const char * s = tst;
199     while (*s) {
200     if (*s++ != '%')
201     continue;
202     while (!strchr(allPoss, *s))
203     s++;
204     if (strchr(match, *s))
205     break;
206     s++;
207     }
208     return (*s != '\0')*(s - tst);
209     }
210    
211     // Add a modifier and arguments, opening output file(s)
212     bool
213     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
214     const char *prms, const char *binval, int bincnt)
215     {
216 greg 2.8 if (!modn | !outspec || !*modn | !*outspec) {
217 greg 2.1 error(WARNING, "ignoring bad call to AddModifier()");
218     return false;
219     }
220 greg 2.12 if (*outspec == '!') {
221     error(USER, "command output not supported by RcontribSimulManager");
222     return false;
223     }
224 greg 2.1 if (!nChan) { // initial call?
225 greg 2.7 if ((xres < 0) | (yres <= 0)) {
226 greg 2.8 error(USER, "xres, yres must be set before first modifier");
227 greg 2.7 return false;
228     }
229 greg 2.1 if (!SetDataFormat(dtyp))
230     return false;
231     nChan = NCSAMP;
232     } else if (nChan != NCSAMP) {
233 greg 2.11 error(USER, "# spectral channels must be fixed in AddModifier()");
234 greg 2.1 return false;
235     }
236     if (Ready()) {
237     error(INTERNAL, "call to AddModifier() after PrepOutput()");
238     return false;
239     }
240     LUENT * lp = lu_find(&modLUT, modn);
241     if (lp->data) {
242     sprintf(errmsg, "duplicate modifier '%s'", modn);
243     error(USER, errmsg);
244     return false;
245     }
246     if (!lp->key) // create new entry
247     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
248    
249     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
250     if (!mp) return false;
251     lp->data = (char *)mp;
252     const int modndx = hasFormat(outspec, "sb");
253     const int binndx = hasFormat(outspec, "diouxX");
254     int bin0 = 0;
255     char fnbuf[512];
256 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
257 greg 2.1 sprintf(fnbuf, outspec, bin0, modn);
258     else
259     sprintf(fnbuf, outspec, modn, bin0);
260     RcontribOutput * olast = NULL;
261     RcontribOutput * op;
262     for (op = outList; op; op = (olast=op)->next) {
263     if (strcmp(op->GetName(), fnbuf))
264     continue;
265     if (modndx) { // this ain't right
266     sprintf(errmsg, "output name collision for '%s'", fnbuf);
267     error(USER, errmsg);
268     return false;
269     }
270     if ((binndx > 0) & (xres > 0)) {
271     sprintf(fnbuf, outspec, ++bin0);
272     continue; // each bin goes to another image
273     }
274     mp->coffset = op->rowBytes; // else add to what's there
275     break;
276     }
277     if (!op) { // create new output channel?
278     op = new RcontribOutput(fnbuf);
279     if (olast) olast->next = op;
280     else outList = op;
281     }
282     mp->opl = op; // first (maybe only) output channel
283     if (modndx) // remember modifier if part of name
284     op->omod = lp->key;
285 greg 2.8 if (binndx) { // append output image/bin list
286 greg 2.1 op->rowBytes += dsiz;
287     op->obin = bin0;
288     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
289 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
290 greg 2.1 sprintf(fnbuf, outspec, bin0+bincnt, modn);
291     else
292     sprintf(fnbuf, outspec, modn, bin0+bincnt);
293     if (!op->next) {
294     olast = op;
295     olast->next = op = new RcontribOutput(fnbuf);
296     if (modndx) op->omod = lp->key;
297     op->obin = bin0+bincnt;
298     } else {
299     op = op->next;
300     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
301     "bin number mismatch in AddModifier()");
302     }
303     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
304     "row offset mismatch in AddModifier()");
305     op->rowBytes += dsiz;
306     }
307     } else // else send all results to this channel
308 greg 2.8 op->rowBytes += mp->nbins*dsiz;
309 greg 2.1 return true;
310     }
311    
312     // Add a file of modifiers with associated arguments
313     bool
314     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
315     const char *prms,
316     const char *binval, int bincnt)
317     {
318     char * path = getpath(const_cast<char *>(modfn),
319     getrlibpath(), R_OK);
320     FILE * fp;
321     if (!path || !(fp = fopen(path, "r"))) {
322     sprintf(errmsg, "cannot %s modifier file '%s'",
323     path ? "open" : "find", modfn);
324     error(SYSTEM, errmsg);
325     return false;
326     }
327     char mod[MAXSTR];
328     while (fgetword(mod, sizeof(mod), fp))
329 greg 2.11 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
330     fclose(fp);
331 greg 2.1 return false;
332 greg 2.11 }
333 greg 2.1 fclose(fp);
334     return true;
335     }
336    
337 greg 2.4 // call-back to check if modifier has been loaded
338     static int
339     checkModExists(const LUENT *lp, void *p)
340     {
341 greg 2.11 OBJECT mod = modifier(lp->key);
342    
343     if ((mod != OVOID) & (mod < nsceneobjs))
344 greg 2.4 return 1;
345    
346     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
347     error(WARNING, errmsg);
348     return 0;
349     }
350    
351 greg 2.1 // Prepare output channels and return # completed rows
352     int
353     RcontribSimulManager::PrepOutput()
354     {
355     if (!outList || !RtraceSimulManager::Ready()) {
356 greg 2.11 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
357 greg 2.1 return -1;
358     }
359 greg 2.5 if (!cdsF) {
360 greg 2.11 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
361 greg 2.5 return -1;
362     }
363 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
364     return -1;
365    
366 greg 2.11 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
367 greg 2.1 int remWarnings = 20;
368     for (RcontribOutput *op = outList; op; op = op->next) {
369     if (op->rData) {
370     error(INTERNAL, "output channel already open in PrepOutput()");
371     return -1;
372     }
373 greg 2.11 op->nRows = outList->nRows;
374 greg 2.1 op->rData = (*cdsF)(op->ofname, outOp,
375     GetHeadLen()+1024 + op->nRows*op->rowBytes);
376     freeqstr(op->ofname); op->ofname = NULL;
377     if (outOp == RCOrecover) {
378     int rd = op->CheckHeader(this);
379     if (rd < 0)
380     return -1;
381     if (rd >= op->nRows) {
382     if (remWarnings >= 0) {
383 greg 2.11 sprintf(errmsg, "recovered output '%s' is complete",
384 greg 2.1 op->GetName());
385 greg 2.11 error(WARNING, --remWarnings ? errmsg : "etc...");
386 greg 2.1 }
387     rd = op->nRows;
388     }
389     if (!rInPos | (rInPos > rd)) rInPos = rd;
390     } else if (!op->NewHeader(this))
391     return -1;
392     // make sure there's room
393     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
394     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
395     return -1; // calls error() for us
396     }
397     rowsDone.NewBitMap(outList->nRows); // create row completion map
398     rowsDone.ClearBits(0, rInPos, true);
399     return rInPos;
400     }
401    
402     // Create header in open write-only channel
403     bool
404     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
405     {
406     int headlim = rcp->GetHeadLen() + 1024;
407     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
408     int esiz;
409     const int etyp = rcp->GetFormat(&esiz);
410    
411     strcpy(hdr, HDRSTR);
412     strcat(hdr, "RADIANCE\n");
413     begData = strlen(hdr);
414     strcpy(hdr+begData, rcp->GetHeadStr());
415     begData += rcp->GetHeadLen();
416     if (omod) {
417     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
418     begData += strlen(hdr+begData);
419     }
420     if (obin >= 0) {
421     sprintf(hdr+begData, "BIN=%d\n", obin);
422     begData += strlen(hdr+begData);
423     }
424     strcpy(hdr+begData, ROWZEROSTR);
425     rowCountPos = begData+LNROWSTR;
426     begData += sizeof(ROWZEROSTR)-1;
427 greg 2.7 if (!rcp->xres | (rowBytes > esiz)) {
428 greg 2.1 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
429     begData += strlen(hdr+begData);
430     }
431     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
432     begData += strlen(hdr+begData);
433     if (NCSAMP > 3) {
434     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
435     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
436     begData += strlen(hdr+begData);
437     }
438     if (etyp != 'c') {
439     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
440     begData += strlen(hdr+begData);
441     }
442     int align = 0;
443     switch (etyp) {
444     case 'f':
445     align = sizeof(float);
446     break;
447     case 'd':
448     align = sizeof(double);
449     break;
450     case 'c':
451     break;
452     default:
453     error(INTERNAL, "unsupported data type in NewHeader()");
454     return false;
455     }
456     strcpy(hdr+begData, FMTSTR); // write format string
457     begData += strlen(hdr+begData);
458     strcpy(hdr+begData, formstr(etyp));
459     begData += strlen(hdr+begData);
460     if (align) // align data at end of header
461     while ((begData+2) % align)
462     hdr[begData++] = ' ';
463     hdr[begData++] = '\n'; // EOL for data format
464     hdr[begData++] = '\n'; // end of nominal header
465 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
466     sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
467 greg 2.1 begData += strlen(hdr+begData);
468     }
469     return rData->ReleaseMemory(hdr, RDSwrite);
470     }
471    
472     // find string argument in header for named variable
473     static const char *
474     findArgs(const char *hdr, const char *vnm, int len)
475     {
476     const char * npos = strnstr(hdr, vnm, len);
477    
478     if (!npos) return NULL;
479    
480     npos += strlen(vnm); // find start of (first) argument
481     len -= npos - hdr;
482     while (len-- > 0 && (*npos == '=') | isspace(*npos))
483     if (*npos++ == '\n')
484     return NULL;
485    
486     return (len >= 0) ? npos : NULL;
487     }
488    
489     // Load and check header in read/write channel
490     int
491     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
492     {
493     int esiz;
494     const int etyp = rcp->GetFormat(&esiz);
495     const int maxlen = rcp->GetHeadLen() + 1024;
496     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
497     const char * cp;
498     // find end of header
499     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
500     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
501     error(USER, errmsg);
502     return -1;
503     }
504     begData = cp - hdr + 1; // increment again at end
505     // check # components
506     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
507     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
508     error(USER, errmsg);
509     return -1;
510     }
511     // check format
512 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
513     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
514 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
515     error(USER, errmsg);
516     return -1;
517     }
518     // check #columns
519     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
520 greg 2.7 !rcp->xres | (rowBytes > esiz)) {
521 greg 2.1 sprintf(errmsg, "expected NCOLS=%d in '%s'",
522     int(rowBytes/esiz), GetName());
523     error(USER, errmsg);
524     return -1;
525     }
526     // find row count
527     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
528     sprintf(errmsg, "missing NROWS in '%s'", GetName());
529     error(USER, errmsg);
530     return -1;
531     }
532     rowCountPos = cp - hdr;
533     int rlast = atoi(cp);
534     begData++; // advance past closing EOL
535 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
536 greg 2.1 char rbuf[64];
537 greg 2.7 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
538 greg 2.1 int rlen = strlen(rbuf);
539     if (strncmp(rbuf, hdr+begData, rlen)) {
540     sprintf(errmsg, "bad resolution string in '%s'", GetName());
541     error(USER, errmsg);
542     return -1;
543     }
544     begData += rlen;
545     }
546     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
547     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
548     }
549    
550     // Rewind calculation (previous results unchanged)
551     bool
552     RcontribSimulManager::ResetRow(int r)
553     {
554     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
555     error(WARNING, "ignoring bad call to ResetRow()");
556     return (r == rInPos);
557     }
558     FlushQueue(); // finish current and reset
559     for (RcontribOutput *op = outList; op; op = op->next)
560     if (!op->SetRowsDone(r))
561     return false;
562    
563     rowsDone.ClearBits(r, rInPos-r, false);
564     rInPos = r;
565     return true;
566     }
567    
568     // call-back for averaging each modifier's contributions to assigned channel(s)
569     static int
570     putModContrib(const LUENT *lp, void *p)
571     {
572     RcontribMod * mp = (RcontribMod *)lp->data;
573     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
574     const double sca = 1./rcp->accum;
575     const DCOLORV * dvp = mp->cbin;
576     RcontribOutput * op = mp->opl;
577     int i, n;
578    
579     switch (rcp->GetFormat()) { // conversion based on output type
580     case 'd': {
581     double * dvo = (double *)op->InsertionP(mp->coffset);
582     for (n = mp->nbins; n--; ) {
583     for (i = 0; i < NCSAMP; i++)
584     *dvo++ = *dvp++ * sca;
585     if ((op->obin >= 0) & (n > 0))
586     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
587     }
588     } break;
589     case 'f': {
590     float * fvo = (float *)op->InsertionP(mp->coffset);
591     for (n = mp->nbins; n--; ) {
592     for (i = 0; i < NCSAMP; i++)
593     *fvo++ = float(*dvp++ * sca);
594     if ((op->obin >= 0) & (n > 0))
595     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
596     }
597     } break;
598     case 'c': {
599     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
600     for (n = mp->nbins; n--; ) {
601     SCOLOR scol;
602     for (i = 0; i < NCSAMP; i++)
603     scol[i] = COLORV(*dvp++ * sca);
604     scolor_scolr(cvo, scol);
605     cvo += LSCOLR;
606     if ((op->obin >= 0) & (n > 0))
607     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
608     }
609     } break;
610     default:
611 greg 2.13 error(CONSISTENCY, "unsupported output type in putModContrib()");
612 greg 2.1 return -1;
613     }
614     // clear for next tally
615     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
616     return 1;
617     }
618    
619     // Add a ray/bundle to compute next record
620     int
621     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
622     {
623     if (!Ready())
624     return 0;
625     if (rInPos >= outList->nRows) {
626     error(WARNING, "ComputeRecord() called after last record");
627     return 0;
628     }
629     if (nkids > 0) { // in parent process?
630 greg 2.14 int k = GetChild(false); // updates output rows
631     if (k < 0) return -1; // someone died?
632 greg 2.1 RowAssignment rass;
633     rass.row = kidRow[k] = rInPos++;
634     rass.ac = accum;
635     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
636     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
637     != sizeof(FVECT)*2*accum) {
638     error(SYSTEM, "cannot write to child; dead process?");
639     return -1;
640     }
641     return accum; // tracing/output happens in child
642     }
643     if (NCSAMP != nChan) {
644     error(INTERNAL, "number of color channels must remain fixed");
645     return -1;
646     }
647     // actual work is done here...
648     if (EnqueueBundle(orig_direc, accum) < 0)
649     return -1;
650    
651     RcontribOutput * op; // prepare output buffers
652     for (op = outList; op; op = op->next)
653     if (!op->GetRow(rInPos))
654     return -1;
655     // convert averages & clear
656     if (lu_doall(&modLUT, putModContrib, this) < 0)
657     return -1;
658     // write buffers
659     for (op = outList; op; op = op->next)
660     op->DoneRow();
661    
662     if (!nkids) // update row if solo process
663     UpdateRowsDone(rInPos++); // XXX increment here is critical
664    
665     return accum;
666     }
667    
668     // Get next available child, returning index or -1 if forceWait & idling
669     int
670     RcontribSimulManager::GetChild(bool forceWait)
671     {
672     if (nkids <= 0)
673     return -1;
674     // take inventory
675     int pn, n = 0;
676 greg 2.14 fd_set readset, errset;
677     FD_ZERO(&readset); FD_ZERO(&errset);
678 greg 2.1 for (pn = nkids; pn--; ) {
679     if (kidRow[pn] < 0) { // child already ready?
680     if (forceWait) continue;
681     return pn; // good enough
682     }
683 greg 2.14 FD_SET(kid[pn].r, &readset); // will check on this one
684     FD_SET(kid[pn].r, &errset);
685     if (kid[pn].r >= n)
686     n = kid[pn].r + 1;
687 greg 2.1 }
688     if (!n) // every child is idle?
689     return -1;
690     // wait on "busy" child(ren)
691 greg 2.14 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
692 greg 2.1 if (errno != EINTR) {
693     error(SYSTEM, "select call failed in GetChild()");
694     return -1;
695     }
696 greg 2.14 char buf[sizeof(ROW_DONE)] = "X";
697 greg 2.1 pn = -1; // get flags set by select
698     for (n = nkids; n--; )
699     if (kidRow[n] >= 0 &&
700 greg 2.14 FD_ISSET(kid[n].r, &readset) |
701     FD_ISSET(kid[n].r, &errset)) {
702     // check for error
703     if (FD_ISSET(kid[n].r, &errset) ||
704     read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
705     memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
706     return -1;
707 greg 2.1 // update output row counts
708 greg 2.14 UpdateRowsDone(kidRow[n]);
709     kidRow[n] = -1; // flag child available
710 greg 2.1 pn = n;
711     }
712     return pn;
713     }
714    
715     // Update row completion bitmap and update outputs (does not change rInPos)
716     bool
717     RcontribSimulManager::UpdateRowsDone(int r)
718     {
719     if (!rowsDone.TestAndSet(r)) {
720     error(WARNING, "redundant call to UpdateRowsDone()");
721     return false;
722     }
723     int nDone = GetRowFinished();
724     if (nDone <= r)
725     return true; // nothing to update, yet
726     for (RcontribOutput *op = outList; op; op = op->next)
727     if (!op->SetRowsDone(nDone))
728     return false;
729     return true; // up-to-date
730     }
731    
732     // Run rcontrib child process (never returns)
733     void
734     RcontribSimulManager::RunChild()
735     {
736     FVECT * vecList = NULL;
737     RowAssignment rass;
738     ssize_t nr;
739    
740     accum = 0;
741     errno = 0;
742     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
743     if (!rass.ac) {
744     error(CONSISTENCY, "bad accumulator count in child");
745     exit(1);
746     }
747 greg 2.13 if (rass.ac > accum) {
748     efree(vecList);
749     vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
750     }
751 greg 2.1 accum = rass.ac;
752     rInPos = rass.row;
753    
754     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
755     sizeof(FVECT)*2*accum)
756     break;
757    
758     if (ComputeRecord(vecList) <= 0)
759     exit(1);
760 greg 2.14 // signal this row is done
761     if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
762     exit(1);
763 greg 2.1 }
764     if (nr) {
765     error(SYSTEM, "read error in child process");
766     exit(1);
767     }
768     exit(0);
769     }
770    
771     // Add child processes as indicated
772     bool
773     RcontribSimulManager::StartKids(int n2go)
774     {
775     if ((n2go <= 0) | (nkids + n2go <= 1))
776     return false;
777    
778     if (!nkids)
779     cow_memshare(); // preload objects
780    
781     n2go += nkids; // => desired brood size
782     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
783     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
784    
785     fflush(stdout); // shouldn't use, anyway
786     while (nkids < n2go) {
787     kid[nkids] = sp_inactive;
788     int rv = open_process(&kid[nkids], NULL);
789     if (!rv) { // in child process?
790 greg 2.14 while (nkids-- > 0) {
791     close(kid[nkids].r);
792 greg 2.1 close(kid[nkids].w);
793 greg 2.14 }
794 greg 2.1 free(kid); free(kidRow);
795     kid = NULL; kidRow = NULL;
796     RunChild(); // should never return
797     _exit(1);
798     }
799     if (rv < 0) {
800     error(SYSTEM, "cannot fork worker process");
801     return false;
802     }
803     kidRow[nkids++] = -1; // newborn is ready
804     }
805     return true;
806     }
807    
808     // Reap the indicated number of children (all if 0)
809     int
810     RcontribSimulManager::StopKids(int n2end)
811     {
812     if (nkids <= 0)
813     return 0;
814     FlushQueue();
815     int status = 0;
816     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
817     status = close_processes(kid, nkids);
818     free(kid); free(kidRow);
819     kid = NULL; kidRow = NULL;
820     nkids = 0;
821     } else { // else shrink family
822     int st;
823     while (n2end-- > 0)
824     if ((st = close_process(&kid[--nkids])) > 0)
825     status = st;
826     // could call realloc(), but it hardly matters
827     }
828     if (status) {
829     sprintf(errmsg, "non-zero (%d) status from child", status);
830     error(WARNING, errmsg);
831     }
832     return status;
833     }
834    
835     // Set number of computation threads (0 => #cores)
836     int
837     RcontribSimulManager::SetThreadCount(int nt)
838     {
839     if (!Ready()) {
840     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
841     return 0;
842     }
843     if (nt < 0)
844 greg 2.6 return NThreads();
845 greg 2.1 if (!nt) nt = GetNCores();
846     int status = 0;
847     if (nt == 1)
848     status = StopKids();
849     else if (nt < nkids)
850     status = StopKids(nkids-nt);
851     else if (nt > nkids)
852     StartKids(nt-nkids);
853     if (status) {
854     sprintf(errmsg, "non-zero (%d) status from child", status);
855     error(WARNING, errmsg);
856     }
857 greg 2.6 return NThreads();
858 greg 2.1 }