ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.16
Committed: Thu Oct 23 01:26:48 2025 UTC (13 days, 4 hours ago) by greg
Branch: MAIN
Changes since 2.15: +2 -1 lines
Log Message:
fix(rxfluxmtx): Added -t option and numerous bug fixes

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.16 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.15 2025/10/17 17:43:53 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27     // new/exclusive, overwrite if exists, or recover data
28     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29     RDSread|RDSwrite};
30    
31     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32     #define LNROWSTR 6
33    
34     // Modifier channel for recording contributions (no constructor/destructor)
35     struct RcontribMod {
36     RcontribOutput * opl; // pointer to first output channel
37     char * params; // parameters string
38 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
39 greg 2.1 int nbins; // bin count this modifier
40     int coffset; // column offset in bytes
41     DCOLORV cbin[1]; // bin accumulator (extends struct)
42     /// Access specific (spectral) color bin
43     DCOLORV * operator[](int n) {
44     if ((n < 0) | (n >= nbins)) return NULL;
45     return cbin + n*NCSAMP;
46     }
47     };
48    
49 greg 2.2 // Struct used to assign record calculation to child
50 greg 2.1 struct RowAssignment {
51     uint32 row; // row to do
52     uint32 ac; // accumulation count
53     };
54    
55 greg 2.14 static const char ROW_DONE[] = "ROW FINISHED\n";
56    
57 greg 2.15 // allocate rcontrib accumulator
58     static RcontribMod *
59 greg 2.1 NewRcMod(const char *prms, const char *binexpr, int ncbins)
60     {
61 greg 2.16 if (binexpr && !*binexpr) binexpr = NULL;
62 greg 2.1 if (!prms) prms = "";
63 greg 2.10 if ((ncbins > 1) & !binexpr) {
64 greg 2.8 error(USER, "missing bin expression");
65 greg 2.4 return NULL;
66     }
67 greg 2.10 if (ncbins < 1) ncbins = 1;
68    
69 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
70     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
71     strlen(prms)+1);
72    
73 greg 2.11 if (binexpr) { // get/check bin expression
74 greg 2.4 mp->binv = eparse(const_cast<char *>(binexpr));
75 greg 2.10 if (mp->binv->type == NUM) { // constant expression (0)?
76 greg 2.11 if ((int)evalue(mp->binv) != 0) {
77     sprintf(errmsg, "illegal non-zero constant for bin (%s)",
78 greg 2.10 binexpr);
79     error(USER, errmsg);
80     }
81     if (ncbins > 1) {
82     sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
83     error(USER, errmsg);
84     }
85     epfree(mp->binv, true);
86     mp->binv = NULL;
87     prms = "";
88     ncbins = 1;
89     }
90 greg 2.4 }
91 greg 2.10 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
92 greg 2.1 mp->nbins = ncbins;
93     return mp;
94     }
95    
96 greg 2.15 // Free an RcontribMod (public for RcontribSimulManager constructor)
97 greg 2.1 void
98     FreeRcMod(void *p)
99     {
100     if (!p) return;
101 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
102     if (bep) epfree(bep, true);
103 greg 2.1 efree(p);
104     }
105    
106 greg 2.15 // Get format identifier
107     const char *
108     formstr(int f)
109     {
110     switch (f) {
111     case 'a': return("ascii");
112     case 'f': return("float");
113     case 'd': return("double");
114     case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
115     }
116     return("unknown");
117     }
118    
119     // Our default data share function
120     RdataShare *
121     defDataShare(const char *name, RCOutputOp op, size_t siz)
122     {
123     return new RdataShareMap(name, RSDOflags[op], siz);
124     }
125    
126 greg 2.1 // Set output format ('f', 'd', or 'c')
127     bool
128     RcontribSimulManager::SetDataFormat(int ty)
129     {
130     if (outList) {
131     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
132     return false;
133     }
134     switch (ty) {
135     case 'f':
136     dsiz = sizeof(float)*NCSAMP;
137     break;
138     case 'd':
139     dsiz = sizeof(double)*NCSAMP;
140     break;
141     case 'c':
142     dsiz = LSCOLR;
143     break;
144     default:
145     sprintf(errmsg, "unsupported output format '%c'", ty);
146     error(INTERNAL, errmsg);
147     return false;
148     }
149     dtyp = ty;
150     return true;
151     }
152    
153     // static call-back for rcontrib ray-tracing
154     int
155     RcontribSimulManager::RctCall(RAY *r, void *cd)
156     {
157     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
158     return 0;
159     // shadow ray not on source?
160     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
161     return 0;
162    
163     const char * mname = objptr(r->ro->omod)->oname;
164     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
165     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
166     if (!mp)
167     return 0; // not in our modifier list
168    
169 greg 2.4 int bi = 0; // get bin index
170     if (mp->binv) {
171 greg 2.5 worldfunc(RCCONTEXT, r);
172 greg 2.4 set_eparams(mp->params);
173     double bval = evalue(mp->binv);
174     if (bval <= -.5)
175     return 0; // silently ignore negative bin index
176     bi = int(bval + .5);
177     }
178     DCOLORV * dvp = (*mp)[bi];
179 greg 2.1 if (!dvp) {
180 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
181 greg 2.1 error(WARNING, errmsg);
182     return 0;
183     }
184     SCOLOR contr;
185     raycontrib(contr, r, PRIMARY); // compute coefficient
186 greg 2.7 if (rcp->HasFlag(RCcontrib))
187 greg 2.1 smultscolor(contr, r->rcol); // -> value contribution
188 greg 2.11
189 greg 2.1 for (int i = 0; i < NCSAMP; i++)
190     *dvp++ += contr[i]; // accumulate color/spectrum
191     return 1;
192     }
193    
194     // check for given format type in string, return last char position
195     static int
196     hasFormat(const char *tst, const char *match)
197     {
198     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
199     const char * s = tst;
200     while (*s) {
201     if (*s++ != '%')
202     continue;
203     while (!strchr(allPoss, *s))
204     s++;
205     if (strchr(match, *s))
206     break;
207     s++;
208     }
209     return (*s != '\0')*(s - tst);
210     }
211    
212     // Add a modifier and arguments, opening output file(s)
213     bool
214     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
215     const char *prms, const char *binval, int bincnt)
216     {
217 greg 2.8 if (!modn | !outspec || !*modn | !*outspec) {
218 greg 2.1 error(WARNING, "ignoring bad call to AddModifier()");
219     return false;
220     }
221 greg 2.12 if (*outspec == '!') {
222     error(USER, "command output not supported by RcontribSimulManager");
223     return false;
224     }
225 greg 2.1 if (!nChan) { // initial call?
226 greg 2.7 if ((xres < 0) | (yres <= 0)) {
227 greg 2.8 error(USER, "xres, yres must be set before first modifier");
228 greg 2.7 return false;
229     }
230 greg 2.1 if (!SetDataFormat(dtyp))
231     return false;
232     nChan = NCSAMP;
233     } else if (nChan != NCSAMP) {
234 greg 2.11 error(USER, "# spectral channels must be fixed in AddModifier()");
235 greg 2.1 return false;
236     }
237     if (Ready()) {
238     error(INTERNAL, "call to AddModifier() after PrepOutput()");
239     return false;
240     }
241     LUENT * lp = lu_find(&modLUT, modn);
242     if (lp->data) {
243     sprintf(errmsg, "duplicate modifier '%s'", modn);
244     error(USER, errmsg);
245     return false;
246     }
247     if (!lp->key) // create new entry
248     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
249    
250     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
251     if (!mp) return false;
252     lp->data = (char *)mp;
253     const int modndx = hasFormat(outspec, "sb");
254     const int binndx = hasFormat(outspec, "diouxX");
255     int bin0 = 0;
256     char fnbuf[512];
257 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
258 greg 2.1 sprintf(fnbuf, outspec, bin0, modn);
259     else
260     sprintf(fnbuf, outspec, modn, bin0);
261     RcontribOutput * olast = NULL;
262     RcontribOutput * op;
263     for (op = outList; op; op = (olast=op)->next) {
264     if (strcmp(op->GetName(), fnbuf))
265     continue;
266     if (modndx) { // this ain't right
267     sprintf(errmsg, "output name collision for '%s'", fnbuf);
268     error(USER, errmsg);
269     return false;
270     }
271     if ((binndx > 0) & (xres > 0)) {
272     sprintf(fnbuf, outspec, ++bin0);
273     continue; // each bin goes to another image
274     }
275     mp->coffset = op->rowBytes; // else add to what's there
276     break;
277     }
278     if (!op) { // create new output channel?
279     op = new RcontribOutput(fnbuf);
280     if (olast) olast->next = op;
281     else outList = op;
282     }
283     mp->opl = op; // first (maybe only) output channel
284     if (modndx) // remember modifier if part of name
285     op->omod = lp->key;
286 greg 2.8 if (binndx) { // append output image/bin list
287 greg 2.1 op->rowBytes += dsiz;
288     op->obin = bin0;
289     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
290 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
291 greg 2.1 sprintf(fnbuf, outspec, bin0+bincnt, modn);
292     else
293     sprintf(fnbuf, outspec, modn, bin0+bincnt);
294     if (!op->next) {
295     olast = op;
296     olast->next = op = new RcontribOutput(fnbuf);
297     if (modndx) op->omod = lp->key;
298     op->obin = bin0+bincnt;
299     } else {
300     op = op->next;
301     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
302     "bin number mismatch in AddModifier()");
303     }
304     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
305     "row offset mismatch in AddModifier()");
306     op->rowBytes += dsiz;
307     }
308     } else // else send all results to this channel
309 greg 2.8 op->rowBytes += mp->nbins*dsiz;
310 greg 2.1 return true;
311     }
312    
313     // Add a file of modifiers with associated arguments
314     bool
315     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
316     const char *prms,
317     const char *binval, int bincnt)
318     {
319     char * path = getpath(const_cast<char *>(modfn),
320     getrlibpath(), R_OK);
321     FILE * fp;
322     if (!path || !(fp = fopen(path, "r"))) {
323     sprintf(errmsg, "cannot %s modifier file '%s'",
324     path ? "open" : "find", modfn);
325     error(SYSTEM, errmsg);
326     return false;
327     }
328     char mod[MAXSTR];
329     while (fgetword(mod, sizeof(mod), fp))
330 greg 2.11 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
331     fclose(fp);
332 greg 2.1 return false;
333 greg 2.11 }
334 greg 2.1 fclose(fp);
335     return true;
336     }
337    
338 greg 2.4 // call-back to check if modifier has been loaded
339     static int
340     checkModExists(const LUENT *lp, void *p)
341     {
342 greg 2.11 OBJECT mod = modifier(lp->key);
343    
344     if ((mod != OVOID) & (mod < nsceneobjs))
345 greg 2.4 return 1;
346    
347     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
348     error(WARNING, errmsg);
349     return 0;
350     }
351    
352 greg 2.1 // Prepare output channels and return # completed rows
353     int
354     RcontribSimulManager::PrepOutput()
355     {
356     if (!outList || !RtraceSimulManager::Ready()) {
357 greg 2.11 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
358 greg 2.1 return -1;
359     }
360 greg 2.5 if (!cdsF) {
361 greg 2.11 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
362 greg 2.5 return -1;
363     }
364 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
365     return -1;
366    
367 greg 2.11 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
368 greg 2.1 int remWarnings = 20;
369     for (RcontribOutput *op = outList; op; op = op->next) {
370     if (op->rData) {
371     error(INTERNAL, "output channel already open in PrepOutput()");
372     return -1;
373     }
374 greg 2.11 op->nRows = outList->nRows;
375 greg 2.1 op->rData = (*cdsF)(op->ofname, outOp,
376     GetHeadLen()+1024 + op->nRows*op->rowBytes);
377     freeqstr(op->ofname); op->ofname = NULL;
378     if (outOp == RCOrecover) {
379     int rd = op->CheckHeader(this);
380     if (rd < 0)
381     return -1;
382     if (rd >= op->nRows) {
383     if (remWarnings >= 0) {
384 greg 2.11 sprintf(errmsg, "recovered output '%s' is complete",
385 greg 2.1 op->GetName());
386 greg 2.11 error(WARNING, --remWarnings ? errmsg : "etc...");
387 greg 2.1 }
388     rd = op->nRows;
389     }
390     if (!rInPos | (rInPos > rd)) rInPos = rd;
391     } else if (!op->NewHeader(this))
392     return -1;
393     // make sure there's room
394     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
395     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
396     return -1; // calls error() for us
397     }
398     rowsDone.NewBitMap(outList->nRows); // create row completion map
399     rowsDone.ClearBits(0, rInPos, true);
400     return rInPos;
401     }
402    
403     // Create header in open write-only channel
404     bool
405     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
406     {
407     int headlim = rcp->GetHeadLen() + 1024;
408     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
409     int esiz;
410     const int etyp = rcp->GetFormat(&esiz);
411    
412     strcpy(hdr, HDRSTR);
413     strcat(hdr, "RADIANCE\n");
414     begData = strlen(hdr);
415     strcpy(hdr+begData, rcp->GetHeadStr());
416     begData += rcp->GetHeadLen();
417     if (omod) {
418     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
419     begData += strlen(hdr+begData);
420     }
421     if (obin >= 0) {
422     sprintf(hdr+begData, "BIN=%d\n", obin);
423     begData += strlen(hdr+begData);
424     }
425     strcpy(hdr+begData, ROWZEROSTR);
426     rowCountPos = begData+LNROWSTR;
427     begData += sizeof(ROWZEROSTR)-1;
428 greg 2.7 if (!rcp->xres | (rowBytes > esiz)) {
429 greg 2.1 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
430     begData += strlen(hdr+begData);
431     }
432     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
433     begData += strlen(hdr+begData);
434     if (NCSAMP > 3) {
435     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
436     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
437     begData += strlen(hdr+begData);
438     }
439     if (etyp != 'c') {
440     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
441     begData += strlen(hdr+begData);
442     }
443     int align = 0;
444     switch (etyp) {
445     case 'f':
446     align = sizeof(float);
447     break;
448     case 'd':
449     align = sizeof(double);
450     break;
451     case 'c':
452     break;
453     default:
454     error(INTERNAL, "unsupported data type in NewHeader()");
455     return false;
456     }
457     strcpy(hdr+begData, FMTSTR); // write format string
458     begData += strlen(hdr+begData);
459     strcpy(hdr+begData, formstr(etyp));
460     begData += strlen(hdr+begData);
461     if (align) // align data at end of header
462     while ((begData+2) % align)
463     hdr[begData++] = ' ';
464     hdr[begData++] = '\n'; // EOL for data format
465     hdr[begData++] = '\n'; // end of nominal header
466 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
467     sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
468 greg 2.1 begData += strlen(hdr+begData);
469     }
470     return rData->ReleaseMemory(hdr, RDSwrite);
471     }
472    
473     // find string argument in header for named variable
474     static const char *
475     findArgs(const char *hdr, const char *vnm, int len)
476     {
477     const char * npos = strnstr(hdr, vnm, len);
478    
479     if (!npos) return NULL;
480    
481     npos += strlen(vnm); // find start of (first) argument
482     len -= npos - hdr;
483     while (len-- > 0 && (*npos == '=') | isspace(*npos))
484     if (*npos++ == '\n')
485     return NULL;
486    
487     return (len >= 0) ? npos : NULL;
488     }
489    
490     // Load and check header in read/write channel
491     int
492     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
493     {
494     int esiz;
495     const int etyp = rcp->GetFormat(&esiz);
496     const int maxlen = rcp->GetHeadLen() + 1024;
497     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
498     const char * cp;
499     // find end of header
500     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
501     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
502     error(USER, errmsg);
503     return -1;
504     }
505     begData = cp - hdr + 1; // increment again at end
506     // check # components
507     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
508     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
509     error(USER, errmsg);
510     return -1;
511     }
512     // check format
513 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
514     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
515 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
516     error(USER, errmsg);
517     return -1;
518     }
519     // check #columns
520     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
521 greg 2.7 !rcp->xres | (rowBytes > esiz)) {
522 greg 2.1 sprintf(errmsg, "expected NCOLS=%d in '%s'",
523     int(rowBytes/esiz), GetName());
524     error(USER, errmsg);
525     return -1;
526     }
527     // find row count
528     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
529     sprintf(errmsg, "missing NROWS in '%s'", GetName());
530     error(USER, errmsg);
531     return -1;
532     }
533     rowCountPos = cp - hdr;
534     int rlast = atoi(cp);
535     begData++; // advance past closing EOL
536 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
537 greg 2.1 char rbuf[64];
538 greg 2.7 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
539 greg 2.1 int rlen = strlen(rbuf);
540     if (strncmp(rbuf, hdr+begData, rlen)) {
541     sprintf(errmsg, "bad resolution string in '%s'", GetName());
542     error(USER, errmsg);
543     return -1;
544     }
545     begData += rlen;
546     }
547     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
548     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
549     }
550    
551     // Rewind calculation (previous results unchanged)
552     bool
553     RcontribSimulManager::ResetRow(int r)
554     {
555     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
556     error(WARNING, "ignoring bad call to ResetRow()");
557     return (r == rInPos);
558     }
559     FlushQueue(); // finish current and reset
560     for (RcontribOutput *op = outList; op; op = op->next)
561     if (!op->SetRowsDone(r))
562     return false;
563    
564     rowsDone.ClearBits(r, rInPos-r, false);
565     rInPos = r;
566     return true;
567     }
568    
569     // call-back for averaging each modifier's contributions to assigned channel(s)
570     static int
571     putModContrib(const LUENT *lp, void *p)
572     {
573     RcontribMod * mp = (RcontribMod *)lp->data;
574     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
575     const double sca = 1./rcp->accum;
576     const DCOLORV * dvp = mp->cbin;
577     RcontribOutput * op = mp->opl;
578     int i, n;
579    
580     switch (rcp->GetFormat()) { // conversion based on output type
581     case 'd': {
582     double * dvo = (double *)op->InsertionP(mp->coffset);
583     for (n = mp->nbins; n--; ) {
584     for (i = 0; i < NCSAMP; i++)
585     *dvo++ = *dvp++ * sca;
586     if ((op->obin >= 0) & (n > 0))
587     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
588     }
589     } break;
590     case 'f': {
591     float * fvo = (float *)op->InsertionP(mp->coffset);
592     for (n = mp->nbins; n--; ) {
593     for (i = 0; i < NCSAMP; i++)
594     *fvo++ = float(*dvp++ * sca);
595     if ((op->obin >= 0) & (n > 0))
596     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
597     }
598     } break;
599     case 'c': {
600     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
601     for (n = mp->nbins; n--; ) {
602     SCOLOR scol;
603     for (i = 0; i < NCSAMP; i++)
604     scol[i] = COLORV(*dvp++ * sca);
605     scolor_scolr(cvo, scol);
606     cvo += LSCOLR;
607     if ((op->obin >= 0) & (n > 0))
608     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
609     }
610     } break;
611     default:
612 greg 2.13 error(CONSISTENCY, "unsupported output type in putModContrib()");
613 greg 2.1 return -1;
614     }
615     // clear for next tally
616     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
617     return 1;
618     }
619    
620     // Add a ray/bundle to compute next record
621     int
622     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
623     {
624     if (!Ready())
625     return 0;
626     if (rInPos >= outList->nRows) {
627     error(WARNING, "ComputeRecord() called after last record");
628     return 0;
629     }
630     if (nkids > 0) { // in parent process?
631 greg 2.14 int k = GetChild(false); // updates output rows
632     if (k < 0) return -1; // someone died?
633 greg 2.1 RowAssignment rass;
634     rass.row = kidRow[k] = rInPos++;
635     rass.ac = accum;
636     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
637     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
638     != sizeof(FVECT)*2*accum) {
639     error(SYSTEM, "cannot write to child; dead process?");
640     return -1;
641     }
642     return accum; // tracing/output happens in child
643     }
644     if (NCSAMP != nChan) {
645     error(INTERNAL, "number of color channels must remain fixed");
646     return -1;
647     }
648     // actual work is done here...
649     if (EnqueueBundle(orig_direc, accum) < 0)
650     return -1;
651    
652     RcontribOutput * op; // prepare output buffers
653     for (op = outList; op; op = op->next)
654     if (!op->GetRow(rInPos))
655     return -1;
656     // convert averages & clear
657     if (lu_doall(&modLUT, putModContrib, this) < 0)
658     return -1;
659     // write buffers
660     for (op = outList; op; op = op->next)
661     op->DoneRow();
662    
663     if (!nkids) // update row if solo process
664     UpdateRowsDone(rInPos++); // XXX increment here is critical
665    
666     return accum;
667     }
668    
669     // Get next available child, returning index or -1 if forceWait & idling
670     int
671     RcontribSimulManager::GetChild(bool forceWait)
672     {
673     if (nkids <= 0)
674     return -1;
675     // take inventory
676     int pn, n = 0;
677 greg 2.14 fd_set readset, errset;
678     FD_ZERO(&readset); FD_ZERO(&errset);
679 greg 2.1 for (pn = nkids; pn--; ) {
680     if (kidRow[pn] < 0) { // child already ready?
681     if (forceWait) continue;
682     return pn; // good enough
683     }
684 greg 2.14 FD_SET(kid[pn].r, &readset); // will check on this one
685     FD_SET(kid[pn].r, &errset);
686     if (kid[pn].r >= n)
687     n = kid[pn].r + 1;
688 greg 2.1 }
689     if (!n) // every child is idle?
690     return -1;
691     // wait on "busy" child(ren)
692 greg 2.14 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
693 greg 2.1 if (errno != EINTR) {
694     error(SYSTEM, "select call failed in GetChild()");
695     return -1;
696     }
697 greg 2.14 char buf[sizeof(ROW_DONE)] = "X";
698 greg 2.1 pn = -1; // get flags set by select
699     for (n = nkids; n--; )
700     if (kidRow[n] >= 0 &&
701 greg 2.14 FD_ISSET(kid[n].r, &readset) |
702     FD_ISSET(kid[n].r, &errset)) {
703     // check for error
704     if (FD_ISSET(kid[n].r, &errset) ||
705     read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
706     memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
707     return -1;
708 greg 2.1 // update output row counts
709 greg 2.14 UpdateRowsDone(kidRow[n]);
710     kidRow[n] = -1; // flag child available
711 greg 2.1 pn = n;
712     }
713     return pn;
714     }
715    
716     // Update row completion bitmap and update outputs (does not change rInPos)
717     bool
718     RcontribSimulManager::UpdateRowsDone(int r)
719     {
720     if (!rowsDone.TestAndSet(r)) {
721     error(WARNING, "redundant call to UpdateRowsDone()");
722     return false;
723     }
724     int nDone = GetRowFinished();
725     if (nDone <= r)
726     return true; // nothing to update, yet
727     for (RcontribOutput *op = outList; op; op = op->next)
728     if (!op->SetRowsDone(nDone))
729     return false;
730     return true; // up-to-date
731     }
732    
733     // Run rcontrib child process (never returns)
734     void
735     RcontribSimulManager::RunChild()
736     {
737     FVECT * vecList = NULL;
738     RowAssignment rass;
739     ssize_t nr;
740    
741     accum = 0;
742     errno = 0;
743     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
744     if (!rass.ac) {
745     error(CONSISTENCY, "bad accumulator count in child");
746     exit(1);
747     }
748 greg 2.13 if (rass.ac > accum) {
749     efree(vecList);
750     vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
751     }
752 greg 2.1 accum = rass.ac;
753     rInPos = rass.row;
754    
755     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
756     sizeof(FVECT)*2*accum)
757     break;
758    
759     if (ComputeRecord(vecList) <= 0)
760     exit(1);
761 greg 2.14 // signal this row is done
762     if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
763     exit(1);
764 greg 2.1 }
765     if (nr) {
766     error(SYSTEM, "read error in child process");
767     exit(1);
768     }
769     exit(0);
770     }
771    
772     // Add child processes as indicated
773     bool
774     RcontribSimulManager::StartKids(int n2go)
775     {
776     if ((n2go <= 0) | (nkids + n2go <= 1))
777     return false;
778    
779     if (!nkids)
780     cow_memshare(); // preload objects
781    
782     n2go += nkids; // => desired brood size
783     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
784     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
785    
786     fflush(stdout); // shouldn't use, anyway
787     while (nkids < n2go) {
788     kid[nkids] = sp_inactive;
789     int rv = open_process(&kid[nkids], NULL);
790     if (!rv) { // in child process?
791 greg 2.14 while (nkids-- > 0) {
792     close(kid[nkids].r);
793 greg 2.1 close(kid[nkids].w);
794 greg 2.14 }
795 greg 2.1 free(kid); free(kidRow);
796     kid = NULL; kidRow = NULL;
797     RunChild(); // should never return
798     _exit(1);
799     }
800     if (rv < 0) {
801     error(SYSTEM, "cannot fork worker process");
802     return false;
803     }
804     kidRow[nkids++] = -1; // newborn is ready
805     }
806     return true;
807     }
808    
809     // Reap the indicated number of children (all if 0)
810     int
811     RcontribSimulManager::StopKids(int n2end)
812     {
813     if (nkids <= 0)
814     return 0;
815     FlushQueue();
816     int status = 0;
817     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
818     status = close_processes(kid, nkids);
819     free(kid); free(kidRow);
820     kid = NULL; kidRow = NULL;
821     nkids = 0;
822     } else { // else shrink family
823     int st;
824     while (n2end-- > 0)
825     if ((st = close_process(&kid[--nkids])) > 0)
826     status = st;
827     // could call realloc(), but it hardly matters
828     }
829     if (status) {
830     sprintf(errmsg, "non-zero (%d) status from child", status);
831     error(WARNING, errmsg);
832     }
833     return status;
834     }
835    
836     // Set number of computation threads (0 => #cores)
837     int
838     RcontribSimulManager::SetThreadCount(int nt)
839     {
840     if (!Ready()) {
841     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
842     return 0;
843     }
844     if (nt < 0)
845 greg 2.6 return NThreads();
846 greg 2.1 if (!nt) nt = GetNCores();
847     int status = 0;
848     if (nt == 1)
849     status = StopKids();
850     else if (nt < nkids)
851     status = StopKids(nkids-nt);
852     else if (nt > nkids)
853     StartKids(nt-nkids);
854     if (status) {
855     sprintf(errmsg, "non-zero (%d) status from child", status);
856     error(WARNING, errmsg);
857     }
858 greg 2.6 return NThreads();
859 greg 2.1 }