ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.12
Committed: Thu Oct 16 18:36:23 2025 UTC (2 weeks, 5 days ago) by greg
Branch: MAIN
Changes since 2.11: +5 -1 lines
Log Message:
fix: Added missing error diagnostics

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.12 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.11 2025/01/02 16:16:49 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * RcontribSimulManager.cpp
6     *
7     * Rcontrib simulation manager implementation
8     *
9     * Created by Greg Ward on 10/17/2024.
10     */
11    
12     #include <unistd.h>
13     #include <ctype.h>
14     #include "platform.h"
15     #include "selcall.h"
16     #include "RcontribSimulManager.h"
17     #include "func.h"
18     #include "resolu.h"
19     #include "source.h"
20    
21     char RCCONTEXT[] = "RC.";
22    
23     extern const char HDRSTR[];
24     extern const char BIGEND[];
25     extern const char FMTSTR[];
26    
27     // new/exclusive, overwrite if exists, or recover data
28     int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29     RDSread|RDSwrite};
30    
31     static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32     #define LNROWSTR 6
33    
34     // Modifier channel for recording contributions (no constructor/destructor)
35     struct RcontribMod {
36     RcontribOutput * opl; // pointer to first output channel
37     char * params; // parameters string
38 greg 2.4 EPNODE * binv; // bin expression (NULL if 1 bin)
39 greg 2.1 int nbins; // bin count this modifier
40     int coffset; // column offset in bytes
41     DCOLORV cbin[1]; // bin accumulator (extends struct)
42     /// Access specific (spectral) color bin
43     DCOLORV * operator[](int n) {
44     if ((n < 0) | (n >= nbins)) return NULL;
45     return cbin + n*NCSAMP;
46     }
47     };
48    
49 greg 2.2 // Struct used to assign record calculation to child
50 greg 2.1 struct RowAssignment {
51     uint32 row; // row to do
52     uint32 ac; // accumulation count
53     };
54    
55 greg 2.9 // Get format identifier
56     const char *
57     formstr(int f)
58     {
59     switch (f) {
60     case 'a': return("ascii");
61     case 'f': return("float");
62     case 'd': return("double");
63     case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
64     }
65     return("unknown");
66     }
67    
68 greg 2.1 // Our default data share function
69     RdataShare *
70     defDataShare(const char *name, RCOutputOp op, size_t siz)
71     {
72     return new RdataShareMap(name, RSDOflags[op], siz);
73     }
74    
75     // Allocate rcontrib accumulator
76     RcontribMod *
77     NewRcMod(const char *prms, const char *binexpr, int ncbins)
78     {
79     if (!prms) prms = "";
80 greg 2.10 if ((ncbins > 1) & !binexpr) {
81 greg 2.8 error(USER, "missing bin expression");
82 greg 2.4 return NULL;
83     }
84 greg 2.10 if (ncbins < 1) ncbins = 1;
85    
86 greg 2.1 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
87     sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
88     strlen(prms)+1);
89    
90 greg 2.11 if (binexpr) { // get/check bin expression
91 greg 2.4 mp->binv = eparse(const_cast<char *>(binexpr));
92 greg 2.10 if (mp->binv->type == NUM) { // constant expression (0)?
93 greg 2.11 if ((int)evalue(mp->binv) != 0) {
94     sprintf(errmsg, "illegal non-zero constant for bin (%s)",
95 greg 2.10 binexpr);
96     error(USER, errmsg);
97     }
98     if (ncbins > 1) {
99     sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
100     error(USER, errmsg);
101     }
102     epfree(mp->binv, true);
103     mp->binv = NULL;
104     prms = "";
105     ncbins = 1;
106     }
107 greg 2.4 }
108 greg 2.10 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
109 greg 2.1 mp->nbins = ncbins;
110     return mp;
111     }
112    
113     // Free an RcontribMod
114     void
115     FreeRcMod(void *p)
116     {
117     if (!p) return;
118 greg 2.4 EPNODE * bep = (*(RcontribMod *)p).binv;
119     if (bep) epfree(bep, true);
120 greg 2.1 efree(p);
121     }
122    
123     // Set output format ('f', 'd', or 'c')
124     bool
125     RcontribSimulManager::SetDataFormat(int ty)
126     {
127     if (outList) {
128     error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
129     return false;
130     }
131     switch (ty) {
132     case 'f':
133     dsiz = sizeof(float)*NCSAMP;
134     break;
135     case 'd':
136     dsiz = sizeof(double)*NCSAMP;
137     break;
138     case 'c':
139     dsiz = LSCOLR;
140     break;
141     default:
142     sprintf(errmsg, "unsupported output format '%c'", ty);
143     error(INTERNAL, errmsg);
144     return false;
145     }
146     dtyp = ty;
147     return true;
148     }
149    
150     // static call-back for rcontrib ray-tracing
151     int
152     RcontribSimulManager::RctCall(RAY *r, void *cd)
153     {
154     if (!r->ro || r->ro->omod == OVOID) // hit nothing?
155     return 0;
156     // shadow ray not on source?
157     if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
158     return 0;
159    
160     const char * mname = objptr(r->ro->omod)->oname;
161     RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
162     RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
163     if (!mp)
164     return 0; // not in our modifier list
165    
166 greg 2.4 int bi = 0; // get bin index
167     if (mp->binv) {
168 greg 2.5 worldfunc(RCCONTEXT, r);
169 greg 2.4 set_eparams(mp->params);
170     double bval = evalue(mp->binv);
171     if (bval <= -.5)
172     return 0; // silently ignore negative bin index
173     bi = int(bval + .5);
174     }
175     DCOLORV * dvp = (*mp)[bi];
176 greg 2.1 if (!dvp) {
177 greg 2.4 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
178 greg 2.1 error(WARNING, errmsg);
179     return 0;
180     }
181     SCOLOR contr;
182     raycontrib(contr, r, PRIMARY); // compute coefficient
183 greg 2.7 if (rcp->HasFlag(RCcontrib))
184 greg 2.1 smultscolor(contr, r->rcol); // -> value contribution
185 greg 2.11
186 greg 2.1 for (int i = 0; i < NCSAMP; i++)
187     *dvp++ += contr[i]; // accumulate color/spectrum
188     return 1;
189     }
190    
191     // check for given format type in string, return last char position
192     static int
193     hasFormat(const char *tst, const char *match)
194     {
195     static const char allPoss[] = "%diouxXfFeEgGaAcsb";
196     const char * s = tst;
197     while (*s) {
198     if (*s++ != '%')
199     continue;
200     while (!strchr(allPoss, *s))
201     s++;
202     if (strchr(match, *s))
203     break;
204     s++;
205     }
206     return (*s != '\0')*(s - tst);
207     }
208    
209     // Add a modifier and arguments, opening output file(s)
210     bool
211     RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
212     const char *prms, const char *binval, int bincnt)
213     {
214 greg 2.8 if (!modn | !outspec || !*modn | !*outspec) {
215 greg 2.1 error(WARNING, "ignoring bad call to AddModifier()");
216     return false;
217     }
218 greg 2.12 if (*outspec == '!') {
219     error(USER, "command output not supported by RcontribSimulManager");
220     return false;
221     }
222 greg 2.1 if (!nChan) { // initial call?
223 greg 2.7 if ((xres < 0) | (yres <= 0)) {
224 greg 2.8 error(USER, "xres, yres must be set before first modifier");
225 greg 2.7 return false;
226     }
227 greg 2.1 if (!SetDataFormat(dtyp))
228     return false;
229     nChan = NCSAMP;
230     } else if (nChan != NCSAMP) {
231 greg 2.11 error(USER, "# spectral channels must be fixed in AddModifier()");
232 greg 2.1 return false;
233     }
234     if (Ready()) {
235     error(INTERNAL, "call to AddModifier() after PrepOutput()");
236     return false;
237     }
238     LUENT * lp = lu_find(&modLUT, modn);
239     if (lp->data) {
240     sprintf(errmsg, "duplicate modifier '%s'", modn);
241     error(USER, errmsg);
242     return false;
243     }
244     if (!lp->key) // create new entry
245     lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
246    
247     RcontribMod * mp = NewRcMod(prms, binval, bincnt);
248     if (!mp) return false;
249     lp->data = (char *)mp;
250     const int modndx = hasFormat(outspec, "sb");
251     const int binndx = hasFormat(outspec, "diouxX");
252     int bin0 = 0;
253     char fnbuf[512];
254 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
255 greg 2.1 sprintf(fnbuf, outspec, bin0, modn);
256     else
257     sprintf(fnbuf, outspec, modn, bin0);
258     RcontribOutput * olast = NULL;
259     RcontribOutput * op;
260     for (op = outList; op; op = (olast=op)->next) {
261     if (strcmp(op->GetName(), fnbuf))
262     continue;
263     if (modndx) { // this ain't right
264     sprintf(errmsg, "output name collision for '%s'", fnbuf);
265     error(USER, errmsg);
266     return false;
267     }
268     if ((binndx > 0) & (xres > 0)) {
269     sprintf(fnbuf, outspec, ++bin0);
270     continue; // each bin goes to another image
271     }
272     mp->coffset = op->rowBytes; // else add to what's there
273     break;
274     }
275     if (!op) { // create new output channel?
276     op = new RcontribOutput(fnbuf);
277     if (olast) olast->next = op;
278     else outList = op;
279     }
280     mp->opl = op; // first (maybe only) output channel
281     if (modndx) // remember modifier if part of name
282     op->omod = lp->key;
283 greg 2.8 if (binndx) { // append output image/bin list
284 greg 2.1 op->rowBytes += dsiz;
285     op->obin = bin0;
286     for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
287 greg 2.8 if (!modndx || (binndx > 0) & (modndx > binndx))
288 greg 2.1 sprintf(fnbuf, outspec, bin0+bincnt, modn);
289     else
290     sprintf(fnbuf, outspec, modn, bin0+bincnt);
291     if (!op->next) {
292     olast = op;
293     olast->next = op = new RcontribOutput(fnbuf);
294     if (modndx) op->omod = lp->key;
295     op->obin = bin0+bincnt;
296     } else {
297     op = op->next;
298     CHECK(op->obin != bin0+bincnt, CONSISTENCY,
299     "bin number mismatch in AddModifier()");
300     }
301     CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
302     "row offset mismatch in AddModifier()");
303     op->rowBytes += dsiz;
304     }
305     } else // else send all results to this channel
306 greg 2.8 op->rowBytes += mp->nbins*dsiz;
307 greg 2.1 return true;
308     }
309    
310     // Add a file of modifiers with associated arguments
311     bool
312     RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
313     const char *prms,
314     const char *binval, int bincnt)
315     {
316     char * path = getpath(const_cast<char *>(modfn),
317     getrlibpath(), R_OK);
318     FILE * fp;
319     if (!path || !(fp = fopen(path, "r"))) {
320     sprintf(errmsg, "cannot %s modifier file '%s'",
321     path ? "open" : "find", modfn);
322     error(SYSTEM, errmsg);
323     return false;
324     }
325     char mod[MAXSTR];
326     while (fgetword(mod, sizeof(mod), fp))
327 greg 2.11 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
328     fclose(fp);
329 greg 2.1 return false;
330 greg 2.11 }
331 greg 2.1 fclose(fp);
332     return true;
333     }
334    
335 greg 2.4 // call-back to check if modifier has been loaded
336     static int
337     checkModExists(const LUENT *lp, void *p)
338     {
339 greg 2.11 OBJECT mod = modifier(lp->key);
340    
341     if ((mod != OVOID) & (mod < nsceneobjs))
342 greg 2.4 return 1;
343    
344     sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
345     error(WARNING, errmsg);
346     return 0;
347     }
348    
349 greg 2.1 // Prepare output channels and return # completed rows
350     int
351     RcontribSimulManager::PrepOutput()
352     {
353     if (!outList || !RtraceSimulManager::Ready()) {
354 greg 2.11 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
355 greg 2.1 return -1;
356     }
357 greg 2.5 if (!cdsF) {
358 greg 2.11 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
359 greg 2.5 return -1;
360     }
361 greg 2.4 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
362     return -1;
363    
364 greg 2.11 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
365 greg 2.1 int remWarnings = 20;
366     for (RcontribOutput *op = outList; op; op = op->next) {
367     if (op->rData) {
368     error(INTERNAL, "output channel already open in PrepOutput()");
369     return -1;
370     }
371 greg 2.11 op->nRows = outList->nRows;
372 greg 2.1 op->rData = (*cdsF)(op->ofname, outOp,
373     GetHeadLen()+1024 + op->nRows*op->rowBytes);
374     freeqstr(op->ofname); op->ofname = NULL;
375     if (outOp == RCOrecover) {
376     int rd = op->CheckHeader(this);
377     if (rd < 0)
378     return -1;
379     if (rd >= op->nRows) {
380     if (remWarnings >= 0) {
381 greg 2.11 sprintf(errmsg, "recovered output '%s' is complete",
382 greg 2.1 op->GetName());
383 greg 2.11 error(WARNING, --remWarnings ? errmsg : "etc...");
384 greg 2.1 }
385     rd = op->nRows;
386     }
387     if (!rInPos | (rInPos > rd)) rInPos = rd;
388     } else if (!op->NewHeader(this))
389     return -1;
390     // make sure there's room
391     if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
392     !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
393     return -1; // calls error() for us
394     }
395     rowsDone.NewBitMap(outList->nRows); // create row completion map
396     rowsDone.ClearBits(0, rInPos, true);
397     return rInPos;
398     }
399    
400     // Create header in open write-only channel
401     bool
402     RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
403     {
404     int headlim = rcp->GetHeadLen() + 1024;
405     char * hdr = (char *)rData->GetMemory(0, headlim, 0);
406     int esiz;
407     const int etyp = rcp->GetFormat(&esiz);
408    
409     strcpy(hdr, HDRSTR);
410     strcat(hdr, "RADIANCE\n");
411     begData = strlen(hdr);
412     strcpy(hdr+begData, rcp->GetHeadStr());
413     begData += rcp->GetHeadLen();
414     if (omod) {
415     sprintf(hdr+begData, "MODIFIER=%s\n", omod);
416     begData += strlen(hdr+begData);
417     }
418     if (obin >= 0) {
419     sprintf(hdr+begData, "BIN=%d\n", obin);
420     begData += strlen(hdr+begData);
421     }
422     strcpy(hdr+begData, ROWZEROSTR);
423     rowCountPos = begData+LNROWSTR;
424     begData += sizeof(ROWZEROSTR)-1;
425 greg 2.7 if (!rcp->xres | (rowBytes > esiz)) {
426 greg 2.1 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
427     begData += strlen(hdr+begData);
428     }
429     sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
430     begData += strlen(hdr+begData);
431     if (NCSAMP > 3) {
432     sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
433     WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
434     begData += strlen(hdr+begData);
435     }
436     if (etyp != 'c') {
437     sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
438     begData += strlen(hdr+begData);
439     }
440     int align = 0;
441     switch (etyp) {
442     case 'f':
443     align = sizeof(float);
444     break;
445     case 'd':
446     align = sizeof(double);
447     break;
448     case 'c':
449     break;
450     default:
451     error(INTERNAL, "unsupported data type in NewHeader()");
452     return false;
453     }
454     strcpy(hdr+begData, FMTSTR); // write format string
455     begData += strlen(hdr+begData);
456     strcpy(hdr+begData, formstr(etyp));
457     begData += strlen(hdr+begData);
458     if (align) // align data at end of header
459     while ((begData+2) % align)
460     hdr[begData++] = ' ';
461     hdr[begData++] = '\n'; // EOL for data format
462     hdr[begData++] = '\n'; // end of nominal header
463 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
464     sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
465 greg 2.1 begData += strlen(hdr+begData);
466     }
467     return rData->ReleaseMemory(hdr, RDSwrite);
468     }
469    
470     // find string argument in header for named variable
471     static const char *
472     findArgs(const char *hdr, const char *vnm, int len)
473     {
474     const char * npos = strnstr(hdr, vnm, len);
475    
476     if (!npos) return NULL;
477    
478     npos += strlen(vnm); // find start of (first) argument
479     len -= npos - hdr;
480     while (len-- > 0 && (*npos == '=') | isspace(*npos))
481     if (*npos++ == '\n')
482     return NULL;
483    
484     return (len >= 0) ? npos : NULL;
485     }
486    
487     // Load and check header in read/write channel
488     int
489     RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
490     {
491     int esiz;
492     const int etyp = rcp->GetFormat(&esiz);
493     const int maxlen = rcp->GetHeadLen() + 1024;
494     char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
495     const char * cp;
496     // find end of header
497     if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
498     sprintf(errmsg, "cannot find end of header in '%s'", GetName());
499     error(USER, errmsg);
500     return -1;
501     }
502     begData = cp - hdr + 1; // increment again at end
503     // check # components
504     if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
505     sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
506     error(USER, errmsg);
507     return -1;
508     }
509     // check format
510 greg 2.3 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
511     strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
512 greg 2.1 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
513     error(USER, errmsg);
514     return -1;
515     }
516     // check #columns
517     if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
518 greg 2.7 !rcp->xres | (rowBytes > esiz)) {
519 greg 2.1 sprintf(errmsg, "expected NCOLS=%d in '%s'",
520     int(rowBytes/esiz), GetName());
521     error(USER, errmsg);
522     return -1;
523     }
524     // find row count
525     if (!(cp = findArgs(hdr, "NROWS=", begData))) {
526     sprintf(errmsg, "missing NROWS in '%s'", GetName());
527     error(USER, errmsg);
528     return -1;
529     }
530     rowCountPos = cp - hdr;
531     int rlast = atoi(cp);
532     begData++; // advance past closing EOL
533 greg 2.7 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
534 greg 2.1 char rbuf[64];
535 greg 2.7 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
536 greg 2.1 int rlen = strlen(rbuf);
537     if (strncmp(rbuf, hdr+begData, rlen)) {
538     sprintf(errmsg, "bad resolution string in '%s'", GetName());
539     error(USER, errmsg);
540     return -1;
541     }
542     begData += rlen;
543     }
544     // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
545     return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
546     }
547    
548     // Rewind calculation (previous results unchanged)
549     bool
550     RcontribSimulManager::ResetRow(int r)
551     {
552     if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
553     error(WARNING, "ignoring bad call to ResetRow()");
554     return (r == rInPos);
555     }
556     FlushQueue(); // finish current and reset
557     for (RcontribOutput *op = outList; op; op = op->next)
558     if (!op->SetRowsDone(r))
559     return false;
560    
561     rowsDone.ClearBits(r, rInPos-r, false);
562     rInPos = r;
563     return true;
564     }
565    
566     // call-back for averaging each modifier's contributions to assigned channel(s)
567     static int
568     putModContrib(const LUENT *lp, void *p)
569     {
570     RcontribMod * mp = (RcontribMod *)lp->data;
571     const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
572     const double sca = 1./rcp->accum;
573     const DCOLORV * dvp = mp->cbin;
574     RcontribOutput * op = mp->opl;
575     int i, n;
576    
577     switch (rcp->GetFormat()) { // conversion based on output type
578     case 'd': {
579     double * dvo = (double *)op->InsertionP(mp->coffset);
580     for (n = mp->nbins; n--; ) {
581     for (i = 0; i < NCSAMP; i++)
582     *dvo++ = *dvp++ * sca;
583     if ((op->obin >= 0) & (n > 0))
584     dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
585     }
586     } break;
587     case 'f': {
588     float * fvo = (float *)op->InsertionP(mp->coffset);
589     for (n = mp->nbins; n--; ) {
590     for (i = 0; i < NCSAMP; i++)
591     *fvo++ = float(*dvp++ * sca);
592     if ((op->obin >= 0) & (n > 0))
593     fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
594     }
595     } break;
596     case 'c': {
597     COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
598     for (n = mp->nbins; n--; ) {
599     SCOLOR scol;
600     for (i = 0; i < NCSAMP; i++)
601     scol[i] = COLORV(*dvp++ * sca);
602     scolor_scolr(cvo, scol);
603     cvo += LSCOLR;
604     if ((op->obin >= 0) & (n > 0))
605     cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
606     }
607     } break;
608     default:
609     error(CONSISTENCY, "unsupported output type in sendModContrib()");
610     return -1;
611     }
612     // clear for next tally
613     memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
614     return 1;
615     }
616    
617     // Add a ray/bundle to compute next record
618     int
619     RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
620     {
621     if (!Ready())
622     return 0;
623     if (rInPos >= outList->nRows) {
624     error(WARNING, "ComputeRecord() called after last record");
625     return 0;
626     }
627     if (nkids > 0) { // in parent process?
628     int k = GetChild(); // updates output rows
629     if (k < 0) return -1; // can't really happen
630     RowAssignment rass;
631     rass.row = kidRow[k] = rInPos++;
632     rass.ac = accum;
633     if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
634     writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
635     != sizeof(FVECT)*2*accum) {
636     error(SYSTEM, "cannot write to child; dead process?");
637     return -1;
638     }
639     return accum; // tracing/output happens in child
640     }
641     if (NCSAMP != nChan) {
642     error(INTERNAL, "number of color channels must remain fixed");
643     return -1;
644     }
645     // actual work is done here...
646     if (EnqueueBundle(orig_direc, accum) < 0)
647     return -1;
648    
649     RcontribOutput * op; // prepare output buffers
650     for (op = outList; op; op = op->next)
651     if (!op->GetRow(rInPos))
652     return -1;
653     // convert averages & clear
654     if (lu_doall(&modLUT, putModContrib, this) < 0)
655     return -1;
656     // write buffers
657     for (op = outList; op; op = op->next)
658     op->DoneRow();
659    
660     if (!nkids) // update row if solo process
661     UpdateRowsDone(rInPos++); // XXX increment here is critical
662    
663     return accum;
664     }
665    
666     // Get next available child, returning index or -1 if forceWait & idling
667     int
668     RcontribSimulManager::GetChild(bool forceWait)
669     {
670     if (nkids <= 0)
671     return -1;
672     // take inventory
673     int pn, n = 0;
674     fd_set writeset, errset;
675     FD_ZERO(&writeset); FD_ZERO(&errset);
676     for (pn = nkids; pn--; ) {
677     if (kidRow[pn] < 0) { // child already ready?
678     if (forceWait) continue;
679     return pn; // good enough
680     }
681     FD_SET(kid[pn].w, &writeset); // will check on this one
682     FD_SET(kid[pn].w, &errset);
683     if (kid[pn].w >= n)
684     n = kid[pn].w + 1;
685     }
686     if (!n) // every child is idle?
687     return -1;
688     // wait on "busy" child(ren)
689     while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
690     if (errno != EINTR) {
691     error(SYSTEM, "select call failed in GetChild()");
692     return -1;
693     }
694     pn = -1; // get flags set by select
695     for (n = nkids; n--; )
696     if (kidRow[n] >= 0 &&
697     FD_ISSET(kid[n].w, &writeset) |
698     FD_ISSET(kid[n].w, &errset)) {
699     // update output row counts
700 greg 2.3 if (!FD_ISSET(kid[n].w, &errset))
701     UpdateRowsDone(kidRow[n]);
702 greg 2.1 kidRow[n] = -1; // flag it available
703     pn = n;
704     }
705     return pn;
706     }
707    
708     // Update row completion bitmap and update outputs (does not change rInPos)
709     bool
710     RcontribSimulManager::UpdateRowsDone(int r)
711     {
712     if (!rowsDone.TestAndSet(r)) {
713     error(WARNING, "redundant call to UpdateRowsDone()");
714     return false;
715     }
716     int nDone = GetRowFinished();
717     if (nDone <= r)
718     return true; // nothing to update, yet
719     for (RcontribOutput *op = outList; op; op = op->next)
720     if (!op->SetRowsDone(nDone))
721     return false;
722     return true; // up-to-date
723     }
724    
725     // Run rcontrib child process (never returns)
726     void
727     RcontribSimulManager::RunChild()
728     {
729     FVECT * vecList = NULL;
730     RowAssignment rass;
731     ssize_t nr;
732    
733     accum = 0;
734     errno = 0;
735     while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
736     if (!rass.ac) {
737     error(CONSISTENCY, "bad accumulator count in child");
738     exit(1);
739     }
740     if (rass.ac > accum)
741     vecList = (FVECT *)erealloc(vecList,
742     sizeof(FVECT)*2*rass.ac);
743     accum = rass.ac;
744     rInPos = rass.row;
745    
746     if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
747     sizeof(FVECT)*2*accum)
748     break;
749    
750     if (ComputeRecord(vecList) <= 0)
751     exit(1);
752     }
753     if (nr) {
754     error(SYSTEM, "read error in child process");
755     exit(1);
756     }
757     exit(0);
758     }
759    
760     // Add child processes as indicated
761     bool
762     RcontribSimulManager::StartKids(int n2go)
763     {
764     if ((n2go <= 0) | (nkids + n2go <= 1))
765     return false;
766    
767     if (!nkids)
768     cow_memshare(); // preload objects
769    
770     n2go += nkids; // => desired brood size
771     kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
772     kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
773    
774     fflush(stdout); // shouldn't use, anyway
775     while (nkids < n2go) {
776     kid[nkids] = sp_inactive;
777     kid[nkids].w = dup(1);
778     kid[nkids].flags |= PF_FILT_OUT;
779     int rv = open_process(&kid[nkids], NULL);
780     if (!rv) { // in child process?
781     while (nkids-- > 0)
782     close(kid[nkids].w);
783     free(kid); free(kidRow);
784     kid = NULL; kidRow = NULL;
785     RunChild(); // should never return
786     _exit(1);
787     }
788     if (rv < 0) {
789     error(SYSTEM, "cannot fork worker process");
790     return false;
791     }
792     kidRow[nkids++] = -1; // newborn is ready
793     }
794     return true;
795     }
796    
797     // Reap the indicated number of children (all if 0)
798     int
799     RcontribSimulManager::StopKids(int n2end)
800     {
801     if (nkids <= 0)
802     return 0;
803     FlushQueue();
804     int status = 0;
805     if (!n2end | (n2end >= nkids)) { // end all subprocesses?
806     status = close_processes(kid, nkids);
807     free(kid); free(kidRow);
808     kid = NULL; kidRow = NULL;
809     nkids = 0;
810     } else { // else shrink family
811     int st;
812     while (n2end-- > 0)
813     if ((st = close_process(&kid[--nkids])) > 0)
814     status = st;
815     // could call realloc(), but it hardly matters
816     }
817     if (status) {
818     sprintf(errmsg, "non-zero (%d) status from child", status);
819     error(WARNING, errmsg);
820     }
821     return status;
822     }
823    
824     // Set number of computation threads (0 => #cores)
825     int
826     RcontribSimulManager::SetThreadCount(int nt)
827     {
828     if (!Ready()) {
829     error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
830     return 0;
831     }
832     if (nt < 0)
833 greg 2.6 return NThreads();
834 greg 2.1 if (!nt) nt = GetNCores();
835     int status = 0;
836     if (nt == 1)
837     status = StopKids();
838     else if (nt < nkids)
839     status = StopKids(nkids-nt);
840     else if (nt > nkids)
841     StartKids(nt-nkids);
842     if (status) {
843     sprintf(errmsg, "non-zero (%d) status from child", status);
844     error(WARNING, errmsg);
845     }
846 greg 2.6 return NThreads();
847 greg 2.1 }