ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.18
Committed: Fri Oct 24 18:37:05 2025 UTC (2 weeks ago) by greg
Branch: MAIN
Changes since 2.17: +12 -2 lines
Log Message:
fix(rxcontrib,rxfluxmtx): recovery mode issues

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.18 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.17 2025/10/23 16:33:39 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27     // new/exclusive, overwrite if exists, or recover data
28     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29     RDSread|RDSwrite};
30    
31     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32     #define LNROWSTR 6
33    
34     // Modifier channel for recording contributions (no constructor/destructor)
35     struct RcontribMod {
36     RcontribOutput * opl; // pointer to first output channel
37     char * params; // parameters string
38 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
39 greg 2.1 int nbins; // bin count this modifier
40     int coffset; // column offset in bytes
41     DCOLORV cbin[1]; // bin accumulator (extends struct)
42     /// Access specific (spectral) color bin
43     DCOLORV * operator[](int n) {
44     if ((n < 0) | (n >= nbins)) return NULL;
45     return cbin + n*NCSAMP;
46     }
47     };
48    
49 greg 2.2 // Struct used to assign record calculation to child
50 greg 2.1 struct RowAssignment {
51     uint32 row; // row to do
52     uint32 ac; // accumulation count
53     };
54    
55 greg 2.14 static const char ROW_DONE[] = "ROW FINISHED\n";
56    
57 greg 2.15 // allocate rcontrib accumulator
58     static RcontribMod *
59 greg 2.1 NewRcMod(const char *prms, const char *binexpr, int ncbins)
60     {
61 greg 2.16 if (binexpr && !*binexpr) binexpr = NULL;
62 greg 2.1 if (!prms) prms = "";
63 greg 2.10 if ((ncbins > 1) & !binexpr) {
64 greg 2.8 error(USER, "missing bin expression");
65 greg 2.4 return NULL;
66     }
67 greg 2.10 if (ncbins < 1) ncbins = 1;
68    
69 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
70     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
71     strlen(prms)+1);
72    
73 greg 2.11 if (binexpr) { // get/check bin expression
74 greg 2.4 mp->binv = eparse(const_cast<char *>(binexpr));
75 greg 2.10 if (mp->binv->type == NUM) { // constant expression (0)?
76 greg 2.11 if ((int)evalue(mp->binv) != 0) {
77     sprintf(errmsg, "illegal non-zero constant for bin (%s)",
78 greg 2.10 binexpr);
79     error(USER, errmsg);
80     }
81     if (ncbins > 1) {
82     sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
83     error(USER, errmsg);
84     }
85     epfree(mp->binv, true);
86     mp->binv = NULL;
87     prms = "";
88     ncbins = 1;
89     }
90 greg 2.4 }
91 greg 2.10 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
92 greg 2.1 mp->nbins = ncbins;
93     return mp;
94     }
95    
96 greg 2.15 // Free an RcontribMod (public for RcontribSimulManager constructor)
97 greg 2.1 void
98     FreeRcMod(void *p)
99     {
100     if (!p) return;
101 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
102     if (bep) epfree(bep, true);
103 greg 2.1 efree(p);
104     }
105    
106 greg 2.15 // Get format identifier
107     const char *
108     formstr(int f)
109     {
110     switch (f) {
111     case 'a': return("ascii");
112     case 'f': return("float");
113     case 'd': return("double");
114     case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
115     }
116     return("unknown");
117     }
118    
119 greg 2.17 // Standard file data share function
120 greg 2.15 RdataShare *
121 greg 2.17 fileDataShare(const char *name, RCOutputOp op, size_t siz)
122     {
123 greg 2.18 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
124     sprintf(errmsg, "cannot recover from '%s'", name);
125     error(SYSTEM, errmsg);
126     return NULL;
127     }
128 greg 2.17 return new RdataShareFile(name, RSDOflags[op], siz);
129     }
130    
131     // Memory-mapped data share function
132     RdataShare *
133     mapDataShare(const char *name, RCOutputOp op, size_t siz)
134 greg 2.15 {
135 greg 2.18 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
136     sprintf(errmsg, "cannot recover from '%s'", name);
137     error(SYSTEM, errmsg);
138     return NULL;
139     }
140 greg 2.15 return new RdataShareMap(name, RSDOflags[op], siz);
141     }
142    
143 greg 2.1 // Set output format ('f', 'd', or 'c')
144     bool
145     RcontribSimulManager::SetDataFormat(int ty)
146     {
147     if (outList) {
148     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
149     return false;
150     }
151     switch (ty) {
152     case 'f':
153     dsiz = sizeof(float)*NCSAMP;
154     break;
155     case 'd':
156     dsiz = sizeof(double)*NCSAMP;
157     break;
158     case 'c':
159     dsiz = LSCOLR;
160     break;
161     default:
162     sprintf(errmsg, "unsupported output format '%c'", ty);
163     error(INTERNAL, errmsg);
164     return false;
165     }
166     dtyp = ty;
167     return true;
168     }
169    
170     // static call-back for rcontrib ray-tracing
171     int
172     RcontribSimulManager::RctCall(RAY *r, void *cd)
173     {
174     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
175     return 0;
176     // shadow ray not on source?
177     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
178     return 0;
179    
180     const char * mname = objptr(r->ro->omod)->oname;
181     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
182     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
183     if (!mp)
184     return 0; // not in our modifier list
185    
186 greg 2.4 int bi = 0; // get bin index
187     if (mp->binv) {
188 greg 2.5 worldfunc(RCCONTEXT, r);
189 greg 2.4 set_eparams(mp->params);
190     double bval = evalue(mp->binv);
191     if (bval <= -.5)
192     return 0; // silently ignore negative bin index
193     bi = int(bval + .5);
194     }
195     DCOLORV * dvp = (*mp)[bi];
196 greg 2.1 if (!dvp) {
197 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
198 greg 2.1 error(WARNING, errmsg);
199     return 0;
200     }
201     SCOLOR contr;
202     raycontrib(contr, r, PRIMARY); // compute coefficient
203 greg 2.7 if (rcp->HasFlag(RCcontrib))
204 greg 2.1 smultscolor(contr, r->rcol); // -> value contribution
205 greg 2.11
206 greg 2.1 for (int i = 0; i < NCSAMP; i++)
207     *dvp++ += contr[i]; // accumulate color/spectrum
208     return 1;
209     }
210    
211     // check for given format type in string, return last char position
212     static int
213     hasFormat(const char *tst, const char *match)
214     {
215     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
216     const char * s = tst;
217     while (*s) {
218     if (*s++ != '%')
219     continue;
220     while (!strchr(allPoss, *s))
221     s++;
222     if (strchr(match, *s))
223     break;
224     s++;
225     }
226     return (*s != '\0')*(s - tst);
227     }
228    
229     // Add a modifier and arguments, opening output file(s)
230     bool
231     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
232     const char *prms, const char *binval, int bincnt)
233     {
234 greg 2.8 if (!modn | !outspec || !*modn | !*outspec) {
235 greg 2.1 error(WARNING, "ignoring bad call to AddModifier()");
236     return false;
237     }
238 greg 2.12 if (*outspec == '!') {
239     error(USER, "command output not supported by RcontribSimulManager");
240     return false;
241     }
242 greg 2.1 if (!nChan) { // initial call?
243 greg 2.7 if ((xres < 0) | (yres <= 0)) {
244 greg 2.8 error(USER, "xres, yres must be set before first modifier");
245 greg 2.7 return false;
246     }
247 greg 2.1 if (!SetDataFormat(dtyp))
248     return false;
249     nChan = NCSAMP;
250     } else if (nChan != NCSAMP) {
251 greg 2.11 error(USER, "# spectral channels must be fixed in AddModifier()");
252 greg 2.1 return false;
253     }
254     if (Ready()) {
255     error(INTERNAL, "call to AddModifier() after PrepOutput()");
256     return false;
257     }
258     LUENT * lp = lu_find(&modLUT, modn);
259     if (lp->data) {
260     sprintf(errmsg, "duplicate modifier '%s'", modn);
261     error(USER, errmsg);
262     return false;
263     }
264     if (!lp->key) // create new entry
265     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
266    
267     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
268     if (!mp) return false;
269     lp->data = (char *)mp;
270     const int modndx = hasFormat(outspec, "sb");
271     const int binndx = hasFormat(outspec, "diouxX");
272     int bin0 = 0;
273     char fnbuf[512];
274 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
275 greg 2.1 sprintf(fnbuf, outspec, bin0, modn);
276     else
277     sprintf(fnbuf, outspec, modn, bin0);
278     RcontribOutput * olast = NULL;
279     RcontribOutput * op;
280     for (op = outList; op; op = (olast=op)->next) {
281     if (strcmp(op->GetName(), fnbuf))
282     continue;
283     if (modndx) { // this ain't right
284     sprintf(errmsg, "output name collision for '%s'", fnbuf);
285     error(USER, errmsg);
286     return false;
287     }
288     if ((binndx > 0) & (xres > 0)) {
289     sprintf(fnbuf, outspec, ++bin0);
290     continue; // each bin goes to another image
291     }
292     mp->coffset = op->rowBytes; // else add to what's there
293     break;
294     }
295     if (!op) { // create new output channel?
296     op = new RcontribOutput(fnbuf);
297     if (olast) olast->next = op;
298     else outList = op;
299     }
300     mp->opl = op; // first (maybe only) output channel
301     if (modndx) // remember modifier if part of name
302     op->omod = lp->key;
303 greg 2.8 if (binndx) { // append output image/bin list
304 greg 2.1 op->rowBytes += dsiz;
305     op->obin = bin0;
306     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
307 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
308 greg 2.1 sprintf(fnbuf, outspec, bin0+bincnt, modn);
309     else
310     sprintf(fnbuf, outspec, modn, bin0+bincnt);
311     if (!op->next) {
312     olast = op;
313     olast->next = op = new RcontribOutput(fnbuf);
314     if (modndx) op->omod = lp->key;
315     op->obin = bin0+bincnt;
316     } else {
317     op = op->next;
318     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
319     "bin number mismatch in AddModifier()");
320     }
321     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
322     "row offset mismatch in AddModifier()");
323     op->rowBytes += dsiz;
324     }
325     } else // else send all results to this channel
326 greg 2.8 op->rowBytes += mp->nbins*dsiz;
327 greg 2.1 return true;
328     }
329    
330     // Add a file of modifiers with associated arguments
331     bool
332     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
333     const char *prms,
334     const char *binval, int bincnt)
335     {
336     char * path = getpath(const_cast<char *>(modfn),
337     getrlibpath(), R_OK);
338     FILE * fp;
339     if (!path || !(fp = fopen(path, "r"))) {
340     sprintf(errmsg, "cannot %s modifier file '%s'",
341     path ? "open" : "find", modfn);
342     error(SYSTEM, errmsg);
343     return false;
344     }
345     char mod[MAXSTR];
346     while (fgetword(mod, sizeof(mod), fp))
347 greg 2.11 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
348     fclose(fp);
349 greg 2.1 return false;
350 greg 2.11 }
351 greg 2.1 fclose(fp);
352     return true;
353     }
354    
355 greg 2.4 // call-back to check if modifier has been loaded
356     static int
357     checkModExists(const LUENT *lp, void *p)
358     {
359 greg 2.11 OBJECT mod = modifier(lp->key);
360    
361     if ((mod != OVOID) & (mod < nsceneobjs))
362 greg 2.4 return 1;
363    
364     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
365     error(WARNING, errmsg);
366     return 0;
367     }
368    
369 greg 2.1 // Prepare output channels and return # completed rows
370     int
371     RcontribSimulManager::PrepOutput()
372     {
373     if (!outList || !RtraceSimulManager::Ready()) {
374 greg 2.11 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
375 greg 2.1 return -1;
376     }
377 greg 2.5 if (!cdsF) {
378 greg 2.11 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
379 greg 2.5 return -1;
380     }
381 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
382     return -1;
383    
384 greg 2.11 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
385 greg 2.1 int remWarnings = 20;
386     for (RcontribOutput *op = outList; op; op = op->next) {
387     if (op->rData) {
388     error(INTERNAL, "output channel already open in PrepOutput()");
389     return -1;
390     }
391 greg 2.11 op->nRows = outList->nRows;
392 greg 2.1 op->rData = (*cdsF)(op->ofname, outOp,
393     GetHeadLen()+1024 + op->nRows*op->rowBytes);
394     freeqstr(op->ofname); op->ofname = NULL;
395     if (outOp == RCOrecover) {
396     int rd = op->CheckHeader(this);
397     if (rd < 0)
398     return -1;
399     if (rd >= op->nRows) {
400 greg 2.18 if (remWarnings > 0) {
401 greg 2.11 sprintf(errmsg, "recovered output '%s' is complete",
402 greg 2.1 op->GetName());
403 greg 2.11 error(WARNING, --remWarnings ? errmsg : "etc...");
404 greg 2.1 }
405     rd = op->nRows;
406     }
407     if (!rInPos | (rInPos > rd)) rInPos = rd;
408     } else if (!op->NewHeader(this))
409     return -1;
410     // make sure there's room
411     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
412     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
413     return -1; // calls error() for us
414     }
415     rowsDone.NewBitMap(outList->nRows); // create row completion map
416     rowsDone.ClearBits(0, rInPos, true);
417     return rInPos;
418     }
419    
420     // Create header in open write-only channel
421     bool
422     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
423     {
424     int headlim = rcp->GetHeadLen() + 1024;
425     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
426     int esiz;
427     const int etyp = rcp->GetFormat(&esiz);
428    
429     strcpy(hdr, HDRSTR);
430     strcat(hdr, "RADIANCE\n");
431     begData = strlen(hdr);
432     strcpy(hdr+begData, rcp->GetHeadStr());
433     begData += rcp->GetHeadLen();
434     if (omod) {
435     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
436     begData += strlen(hdr+begData);
437     }
438     if (obin >= 0) {
439     sprintf(hdr+begData, "BIN=%d\n", obin);
440     begData += strlen(hdr+begData);
441     }
442     strcpy(hdr+begData, ROWZEROSTR);
443     rowCountPos = begData+LNROWSTR;
444     begData += sizeof(ROWZEROSTR)-1;
445 greg 2.7 if (!rcp->xres | (rowBytes > esiz)) {
446 greg 2.1 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
447     begData += strlen(hdr+begData);
448     }
449     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
450     begData += strlen(hdr+begData);
451     if (NCSAMP > 3) {
452     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
453     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
454     begData += strlen(hdr+begData);
455     }
456     if (etyp != 'c') {
457     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
458     begData += strlen(hdr+begData);
459     }
460     int align = 0;
461     switch (etyp) {
462     case 'f':
463     align = sizeof(float);
464     break;
465     case 'd':
466     align = sizeof(double);
467     break;
468     case 'c':
469     break;
470     default:
471     error(INTERNAL, "unsupported data type in NewHeader()");
472     return false;
473     }
474     strcpy(hdr+begData, FMTSTR); // write format string
475     begData += strlen(hdr+begData);
476     strcpy(hdr+begData, formstr(etyp));
477     begData += strlen(hdr+begData);
478     if (align) // align data at end of header
479     while ((begData+2) % align)
480     hdr[begData++] = ' ';
481     hdr[begData++] = '\n'; // EOL for data format
482     hdr[begData++] = '\n'; // end of nominal header
483 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
484     sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
485 greg 2.1 begData += strlen(hdr+begData);
486     }
487     return rData->ReleaseMemory(hdr, RDSwrite);
488     }
489    
490     // find string argument in header for named variable
491     static const char *
492     findArgs(const char *hdr, const char *vnm, int len)
493     {
494     const char * npos = strnstr(hdr, vnm, len);
495    
496     if (!npos) return NULL;
497    
498     npos += strlen(vnm); // find start of (first) argument
499     len -= npos - hdr;
500     while (len-- > 0 && (*npos == '=') | isspace(*npos))
501     if (*npos++ == '\n')
502     return NULL;
503    
504     return (len >= 0) ? npos : NULL;
505     }
506    
507     // Load and check header in read/write channel
508     int
509     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
510     {
511     int esiz;
512     const int etyp = rcp->GetFormat(&esiz);
513     const int maxlen = rcp->GetHeadLen() + 1024;
514     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
515     const char * cp;
516     // find end of header
517     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
518     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
519     error(USER, errmsg);
520     return -1;
521     }
522     begData = cp - hdr + 1; // increment again at end
523     // check # components
524     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
525     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
526     error(USER, errmsg);
527     return -1;
528     }
529     // check format
530 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
531     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
532 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
533     error(USER, errmsg);
534     return -1;
535     }
536     // check #columns
537     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
538 greg 2.7 !rcp->xres | (rowBytes > esiz)) {
539 greg 2.1 sprintf(errmsg, "expected NCOLS=%d in '%s'",
540     int(rowBytes/esiz), GetName());
541     error(USER, errmsg);
542     return -1;
543     }
544     // find row count
545     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
546     sprintf(errmsg, "missing NROWS in '%s'", GetName());
547     error(USER, errmsg);
548     return -1;
549     }
550     rowCountPos = cp - hdr;
551     int rlast = atoi(cp);
552     begData++; // advance past closing EOL
553 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
554 greg 2.1 char rbuf[64];
555 greg 2.7 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
556 greg 2.1 int rlen = strlen(rbuf);
557     if (strncmp(rbuf, hdr+begData, rlen)) {
558     sprintf(errmsg, "bad resolution string in '%s'", GetName());
559     error(USER, errmsg);
560     return -1;
561     }
562     begData += rlen;
563     }
564     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
565     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
566     }
567    
568     // Rewind calculation (previous results unchanged)
569     bool
570     RcontribSimulManager::ResetRow(int r)
571     {
572     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
573     error(WARNING, "ignoring bad call to ResetRow()");
574     return (r == rInPos);
575     }
576     FlushQueue(); // finish current and reset
577     for (RcontribOutput *op = outList; op; op = op->next)
578     if (!op->SetRowsDone(r))
579     return false;
580    
581     rowsDone.ClearBits(r, rInPos-r, false);
582     rInPos = r;
583     return true;
584     }
585    
586     // call-back for averaging each modifier's contributions to assigned channel(s)
587     static int
588     putModContrib(const LUENT *lp, void *p)
589     {
590     RcontribMod * mp = (RcontribMod *)lp->data;
591     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
592     const double sca = 1./rcp->accum;
593     const DCOLORV * dvp = mp->cbin;
594     RcontribOutput * op = mp->opl;
595     int i, n;
596    
597     switch (rcp->GetFormat()) { // conversion based on output type
598     case 'd': {
599     double * dvo = (double *)op->InsertionP(mp->coffset);
600     for (n = mp->nbins; n--; ) {
601     for (i = 0; i < NCSAMP; i++)
602     *dvo++ = *dvp++ * sca;
603     if ((op->obin >= 0) & (n > 0))
604     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
605     }
606     } break;
607     case 'f': {
608     float * fvo = (float *)op->InsertionP(mp->coffset);
609     for (n = mp->nbins; n--; ) {
610     for (i = 0; i < NCSAMP; i++)
611     *fvo++ = float(*dvp++ * sca);
612     if ((op->obin >= 0) & (n > 0))
613     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
614     }
615     } break;
616     case 'c': {
617     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
618     for (n = mp->nbins; n--; ) {
619     SCOLOR scol;
620     for (i = 0; i < NCSAMP; i++)
621     scol[i] = COLORV(*dvp++ * sca);
622     scolor_scolr(cvo, scol);
623     cvo += LSCOLR;
624     if ((op->obin >= 0) & (n > 0))
625     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
626     }
627     } break;
628     default:
629 greg 2.13 error(CONSISTENCY, "unsupported output type in putModContrib()");
630 greg 2.1 return -1;
631     }
632     // clear for next tally
633     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
634     return 1;
635     }
636    
637     // Add a ray/bundle to compute next record
638     int
639     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
640     {
641     if (!Ready())
642     return 0;
643     if (rInPos >= outList->nRows) {
644     error(WARNING, "ComputeRecord() called after last record");
645     return 0;
646     }
647     if (nkids > 0) { // in parent process?
648 greg 2.14 int k = GetChild(false); // updates output rows
649     if (k < 0) return -1; // someone died?
650 greg 2.1 RowAssignment rass;
651     rass.row = kidRow[k] = rInPos++;
652     rass.ac = accum;
653     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
654     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
655     != sizeof(FVECT)*2*accum) {
656     error(SYSTEM, "cannot write to child; dead process?");
657     return -1;
658     }
659     return accum; // tracing/output happens in child
660     }
661     if (NCSAMP != nChan) {
662     error(INTERNAL, "number of color channels must remain fixed");
663     return -1;
664     }
665     // actual work is done here...
666     if (EnqueueBundle(orig_direc, accum) < 0)
667     return -1;
668    
669     RcontribOutput * op; // prepare output buffers
670     for (op = outList; op; op = op->next)
671     if (!op->GetRow(rInPos))
672     return -1;
673     // convert averages & clear
674     if (lu_doall(&modLUT, putModContrib, this) < 0)
675     return -1;
676     // write buffers
677     for (op = outList; op; op = op->next)
678     op->DoneRow();
679    
680     if (!nkids) // update row if solo process
681     UpdateRowsDone(rInPos++); // XXX increment here is critical
682    
683     return accum;
684     }
685    
686     // Get next available child, returning index or -1 if forceWait & idling
687     int
688     RcontribSimulManager::GetChild(bool forceWait)
689     {
690     if (nkids <= 0)
691     return -1;
692     // take inventory
693     int pn, n = 0;
694 greg 2.14 fd_set readset, errset;
695     FD_ZERO(&readset); FD_ZERO(&errset);
696 greg 2.1 for (pn = nkids; pn--; ) {
697     if (kidRow[pn] < 0) { // child already ready?
698     if (forceWait) continue;
699     return pn; // good enough
700     }
701 greg 2.14 FD_SET(kid[pn].r, &readset); // will check on this one
702     FD_SET(kid[pn].r, &errset);
703     if (kid[pn].r >= n)
704     n = kid[pn].r + 1;
705 greg 2.1 }
706     if (!n) // every child is idle?
707     return -1;
708     // wait on "busy" child(ren)
709 greg 2.14 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
710 greg 2.1 if (errno != EINTR) {
711     error(SYSTEM, "select call failed in GetChild()");
712     return -1;
713     }
714 greg 2.14 char buf[sizeof(ROW_DONE)] = "X";
715 greg 2.1 pn = -1; // get flags set by select
716     for (n = nkids; n--; )
717     if (kidRow[n] >= 0 &&
718 greg 2.14 FD_ISSET(kid[n].r, &readset) |
719     FD_ISSET(kid[n].r, &errset)) {
720     // check for error
721     if (FD_ISSET(kid[n].r, &errset) ||
722     read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
723     memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
724     return -1;
725 greg 2.1 // update output row counts
726 greg 2.14 UpdateRowsDone(kidRow[n]);
727     kidRow[n] = -1; // flag child available
728 greg 2.1 pn = n;
729     }
730     return pn;
731     }
732    
733     // Update row completion bitmap and update outputs (does not change rInPos)
734     bool
735     RcontribSimulManager::UpdateRowsDone(int r)
736     {
737     if (!rowsDone.TestAndSet(r)) {
738     error(WARNING, "redundant call to UpdateRowsDone()");
739     return false;
740     }
741     int nDone = GetRowFinished();
742     if (nDone <= r)
743     return true; // nothing to update, yet
744     for (RcontribOutput *op = outList; op; op = op->next)
745     if (!op->SetRowsDone(nDone))
746     return false;
747     return true; // up-to-date
748     }
749    
750     // Run rcontrib child process (never returns)
751     void
752     RcontribSimulManager::RunChild()
753     {
754     FVECT * vecList = NULL;
755     RowAssignment rass;
756     ssize_t nr;
757    
758     accum = 0;
759     errno = 0;
760     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
761     if (!rass.ac) {
762     error(CONSISTENCY, "bad accumulator count in child");
763     exit(1);
764     }
765 greg 2.13 if (rass.ac > accum) {
766     efree(vecList);
767     vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
768     }
769 greg 2.1 accum = rass.ac;
770     rInPos = rass.row;
771    
772     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
773     sizeof(FVECT)*2*accum)
774     break;
775    
776     if (ComputeRecord(vecList) <= 0)
777     exit(1);
778 greg 2.14 // signal this row is done
779     if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
780     exit(1);
781 greg 2.1 }
782     if (nr) {
783     error(SYSTEM, "read error in child process");
784     exit(1);
785     }
786     exit(0);
787     }
788    
789     // Add child processes as indicated
790     bool
791     RcontribSimulManager::StartKids(int n2go)
792     {
793     if ((n2go <= 0) | (nkids + n2go <= 1))
794     return false;
795    
796     if (!nkids)
797     cow_memshare(); // preload objects
798    
799     n2go += nkids; // => desired brood size
800     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
801     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
802    
803     fflush(stdout); // shouldn't use, anyway
804     while (nkids < n2go) {
805     kid[nkids] = sp_inactive;
806     int rv = open_process(&kid[nkids], NULL);
807     if (!rv) { // in child process?
808 greg 2.14 while (nkids-- > 0) {
809     close(kid[nkids].r);
810 greg 2.1 close(kid[nkids].w);
811 greg 2.14 }
812 greg 2.1 free(kid); free(kidRow);
813     kid = NULL; kidRow = NULL;
814     RunChild(); // should never return
815     _exit(1);
816     }
817     if (rv < 0) {
818     error(SYSTEM, "cannot fork worker process");
819     return false;
820     }
821     kidRow[nkids++] = -1; // newborn is ready
822     }
823     return true;
824     }
825    
826     // Reap the indicated number of children (all if 0)
827     int
828     RcontribSimulManager::StopKids(int n2end)
829     {
830     if (nkids <= 0)
831     return 0;
832     FlushQueue();
833     int status = 0;
834     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
835     status = close_processes(kid, nkids);
836     free(kid); free(kidRow);
837     kid = NULL; kidRow = NULL;
838     nkids = 0;
839     } else { // else shrink family
840     int st;
841     while (n2end-- > 0)
842     if ((st = close_process(&kid[--nkids])) > 0)
843     status = st;
844     // could call realloc(), but it hardly matters
845     }
846     if (status) {
847     sprintf(errmsg, "non-zero (%d) status from child", status);
848     error(WARNING, errmsg);
849     }
850     return status;
851     }
852    
853     // Set number of computation threads (0 => #cores)
854     int
855     RcontribSimulManager::SetThreadCount(int nt)
856     {
857     if (!Ready()) {
858     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
859     return 0;
860     }
861     if (nt < 0)
862 greg 2.6 return NThreads();
863 greg 2.1 if (!nt) nt = GetNCores();
864     int status = 0;
865     if (nt == 1)
866     status = StopKids();
867     else if (nt < nkids)
868     status = StopKids(nkids-nt);
869     else if (nt > nkids)
870     StartKids(nt-nkids);
871     if (status) {
872     sprintf(errmsg, "non-zero (%d) status from child", status);
873     error(WARNING, errmsg);
874     }
875 greg 2.6 return NThreads();
876 greg 2.1 }