ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.22
Committed: Tue Nov 11 00:19:10 2025 UTC (7 weeks, 1 day ago) by greg
Branch: MAIN
Changes since 2.21: +4 -1 lines
Log Message:
perf(rcontrib,rxcontrib,rxfluxmtx): Added preemptive test for zero contribution

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.21 2025/10/27 18:38:32 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 static const char ROW_DONE[] = "ROW FINISHED\n";
56
57 // allocate rcontrib accumulator
58 static RcontribMod *
59 NewRcMod(const char *prms, const char *binexpr, int ncbins)
60 {
61 if (binexpr && !*binexpr) binexpr = NULL;
62 if (!prms) prms = "";
63 if ((ncbins > 1) & !binexpr) {
64 error(USER, "missing bin expression");
65 return NULL;
66 }
67 if (ncbins < 1) ncbins = 1;
68
69 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
70 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
71 strlen(prms)+1);
72
73 if (binexpr) { // get/check bin expression
74 mp->binv = eparse(const_cast<char *>(binexpr));
75 if (mp->binv->type == NUM) { // constant expression (0)?
76 if ((int)evalue(mp->binv) != 0) {
77 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
78 binexpr);
79 error(USER, errmsg);
80 }
81 if (ncbins > 1) {
82 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
83 error(USER, errmsg);
84 }
85 epfree(mp->binv, true);
86 mp->binv = NULL;
87 prms = "";
88 ncbins = 1;
89 }
90 }
91 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
92 mp->nbins = ncbins;
93 return mp;
94 }
95
96 // Free an RcontribMod (public for RcontribSimulManager constructor)
97 void
98 FreeRcMod(void *p)
99 {
100 if (!p) return;
101 EPNODE * bep = (*(RcontribMod *)p).binv;
102 if (bep) epfree(bep, true);
103 efree(p);
104 }
105
106 // Get format identifier
107 const char *
108 formstr(int f)
109 {
110 switch (f) {
111 case 'a': return("ascii");
112 case 'f': return("float");
113 case 'd': return("double");
114 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
115 }
116 return("unknown");
117 }
118
119 // Standard file data share function
120 RdataShare *
121 fileDataShare(const char *name, RCOutputOp op, size_t siz)
122 {
123 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
124 sprintf(errmsg, "cannot recover from '%s'", name);
125 error(SYSTEM, errmsg);
126 return NULL;
127 }
128 RdataShare * rds = new RdataShareFile(name, RSDOflags[op],
129 siz*(op != RCOforce));
130
131 if (!rds || (op == RCOforce && rds->Resize(siz) < siz)) {
132 delete rds;
133 sprintf(errmsg, "cannot create %lu byte output file '%s'",
134 (unsigned long)siz, name);
135 error(SYSTEM, errmsg);
136 return NULL;
137 }
138 return rds;
139 }
140
141 // Memory-mapped data share function
142 RdataShare *
143 mapDataShare(const char *name, RCOutputOp op, size_t siz)
144 {
145 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
146 sprintf(errmsg, "cannot recover from '%s'", name);
147 error(SYSTEM, errmsg);
148 return NULL;
149 }
150 RdataShare * rds = new RdataShareMap(name, RSDOflags[op],
151 siz*(op != RCOforce));
152
153 if (!rds || (op == RCOforce && rds->Resize(siz) < siz)) {
154 delete rds;
155 sprintf(errmsg, "cannot create %lu byte output map '%s'",
156 (unsigned long)siz, name);
157 error(SYSTEM, errmsg);
158 return NULL;
159 }
160 return rds;
161 }
162
163 // Set output format ('f', 'd', or 'c')
164 bool
165 RcontribSimulManager::SetDataFormat(int ty)
166 {
167 if (outList) {
168 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
169 return false;
170 }
171 switch (ty) {
172 case 'f':
173 dsiz = sizeof(float)*NCSAMP;
174 break;
175 case 'd':
176 dsiz = sizeof(double)*NCSAMP;
177 break;
178 case 'c':
179 dsiz = LSCOLR;
180 break;
181 default:
182 sprintf(errmsg, "unsupported output format '%c'", ty);
183 error(INTERNAL, errmsg);
184 return false;
185 }
186 dtyp = ty;
187 return true;
188 }
189
190 // static call-back for rcontrib ray-tracing
191 int
192 RcontribSimulManager::RctCall(RAY *r, void *cd)
193 {
194 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
195 return 0;
196 // shadow ray not on source?
197 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
198 return 0;
199
200 const char * mname = objptr(r->ro->omod)->oname;
201 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
202 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
203 if (!mp)
204 return 0; // not in our modifier list
205
206 if (rcp->HasFlag(RCcontrib) && sintens(r->rcol) <= FTINY)
207 return 0; // zero contribution
208
209 int bi = 0; // get bin index
210 if (mp->binv) {
211 worldfunc(RCCONTEXT, r);
212 set_eparams(mp->params);
213 double bval = evalue(mp->binv);
214 if (bval <= -.5)
215 return 0; // silently ignore negative bin index
216 bi = int(bval + .5);
217 }
218 DCOLORV * dvp = (*mp)[bi];
219 if (!dvp) {
220 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
221 error(WARNING, errmsg);
222 return 0;
223 }
224 SCOLOR contr;
225 raycontrib(contr, r, PRIMARY); // compute coefficient
226 if (rcp->HasFlag(RCcontrib))
227 smultscolor(contr, r->rcol); // -> value contribution
228
229 for (int i = 0; i < NCSAMP; i++)
230 *dvp++ += contr[i]; // accumulate color/spectrum
231 return 1;
232 }
233
234 // check for given format type in string, return last char position
235 static int
236 hasFormat(const char *tst, const char *match)
237 {
238 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
239 const char * s = tst;
240 while (*s) {
241 if (*s++ != '%')
242 continue;
243 while (!strchr(allPoss, *s))
244 s++;
245 if (strchr(match, *s))
246 break;
247 s++;
248 }
249 return (*s != '\0')*(s - tst);
250 }
251
252 // Add a modifier and arguments, opening output file(s)
253 bool
254 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
255 const char *prms, const char *binval, int bincnt)
256 {
257 if (!modn | !outspec || !*modn | !*outspec) {
258 error(WARNING, "ignoring bad call to AddModifier()");
259 return false;
260 }
261 if (*outspec == '!') {
262 error(USER, "command output not supported by RcontribSimulManager");
263 return false;
264 }
265 if (!nChan) { // initial call?
266 if ((xres < 0) | (yres <= 0)) {
267 error(USER, "xres, yres must be set before first modifier");
268 return false;
269 }
270 if (!SetDataFormat(dtyp))
271 return false;
272 nChan = NCSAMP;
273 } else if (nChan != NCSAMP) {
274 error(USER, "# spectral channels must be fixed in AddModifier()");
275 return false;
276 }
277 if (Ready()) {
278 error(INTERNAL, "call to AddModifier() after PrepOutput()");
279 return false;
280 }
281 LUENT * lp = lu_find(&modLUT, modn);
282 if (lp->data) {
283 sprintf(errmsg, "duplicate modifier '%s'", modn);
284 error(USER, errmsg);
285 return false;
286 }
287 if (!lp->key) // create new entry
288 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
289
290 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
291 if (!mp) return false;
292 lp->data = (char *)mp;
293 const int modndx = hasFormat(outspec, "sb");
294 const int binndx = hasFormat(outspec, "diouxX");
295 int bin0 = 0;
296 char fnbuf[512];
297 if (!modndx || (binndx > 0) & (modndx > binndx))
298 sprintf(fnbuf, outspec, bin0, modn);
299 else
300 sprintf(fnbuf, outspec, modn, bin0);
301 RcontribOutput * olast = NULL;
302 RcontribOutput * op;
303 for (op = outList; op; op = (olast=op)->next) {
304 if (strcmp(op->GetName(), fnbuf))
305 continue;
306 if (modndx) { // this ain't right
307 sprintf(errmsg, "output name collision for '%s'", fnbuf);
308 error(USER, errmsg);
309 return false;
310 }
311 if ((binndx > 0) & (xres > 0)) {
312 sprintf(fnbuf, outspec, ++bin0);
313 continue; // each bin goes to another image
314 }
315 mp->coffset = op->rowBytes; // else add to what's there
316 break;
317 }
318 if (!op) { // create new output channel?
319 op = new RcontribOutput(fnbuf);
320 if (olast) olast->next = op;
321 else outList = op;
322 }
323 mp->opl = op; // first (maybe only) output channel
324 if (modndx) // remember modifier if part of name
325 op->omod = lp->key;
326 if (binndx) { // append output image/bin list
327 op->rowBytes += dsiz;
328 op->obin = bin0;
329 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
330 if (!modndx || (binndx > 0) & (modndx > binndx))
331 sprintf(fnbuf, outspec, bin0+bincnt, modn);
332 else
333 sprintf(fnbuf, outspec, modn, bin0+bincnt);
334 if (!op->next) {
335 olast = op;
336 olast->next = op = new RcontribOutput(fnbuf);
337 if (modndx) op->omod = lp->key;
338 op->obin = bin0+bincnt;
339 } else {
340 op = op->next;
341 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
342 "bin number mismatch in AddModifier()");
343 }
344 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
345 "row offset mismatch in AddModifier()");
346 op->rowBytes += dsiz;
347 }
348 } else // else send all results to this channel
349 op->rowBytes += mp->nbins*dsiz;
350 return true;
351 }
352
353 // Add a file of modifiers with associated arguments
354 bool
355 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
356 const char *prms,
357 const char *binval, int bincnt)
358 {
359 char * path = getpath(const_cast<char *>(modfn),
360 getrlibpath(), R_OK);
361 FILE * fp;
362 if (!path || !(fp = fopen(path, "r"))) {
363 sprintf(errmsg, "cannot %s modifier file '%s'",
364 path ? "open" : "find", modfn);
365 error(SYSTEM, errmsg);
366 return false;
367 }
368 char mod[MAXSTR];
369 while (fgetword(mod, sizeof(mod), fp))
370 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
371 fclose(fp);
372 return false;
373 }
374 fclose(fp);
375 return true;
376 }
377
378 // call-back to check if modifier has been loaded
379 static int
380 checkModExists(const LUENT *lp, void *p)
381 {
382 OBJECT mod = modifier(lp->key);
383
384 if ((mod != OVOID) & (mod < nsceneobjs))
385 return 1;
386
387 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
388 error(WARNING, errmsg);
389 return 0;
390 }
391
392 // Prepare output channels and return # completed rows
393 int
394 RcontribSimulManager::PrepOutput()
395 {
396 if (!outList || !RtraceSimulManager::Ready()) {
397 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
398 return -1;
399 }
400 if (!cdsF) {
401 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
402 return -1;
403 }
404 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
405 return -1;
406
407 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
408 int remWarnings = 20;
409 for (RcontribOutput *op = outList; op; op = op->next) {
410 if (op->rData) {
411 error(INTERNAL, "output channel already open in PrepOutput()");
412 return -1;
413 }
414 op->nRows = outList->nRows;
415 op->rData = (*cdsF)(op->ofname, outOp,
416 GetHeadLen()+1024 + op->nRows*op->rowBytes);
417 freeqstr(op->ofname); op->ofname = NULL;
418 if (outOp == RCOrecover) {
419 int rd = op->CheckHeader(this);
420 if (rd < 0)
421 return -1;
422 if (rd >= op->nRows) {
423 if (remWarnings > 0) {
424 sprintf(errmsg, "recovered output '%s' is complete",
425 op->GetName());
426 error(WARNING, --remWarnings ? errmsg : "etc...");
427 }
428 rd = op->nRows;
429 }
430 if (!rInPos | (rInPos > rd)) rInPos = rd;
431 } else if (!op->NewHeader(this))
432 return -1;
433 // make sure there's room
434 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
435 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
436 return -1; // calls error() for us
437 }
438 rowsDone.NewBitMap(outList->nRows); // create row completion map
439 rowsDone.ClearBits(0, rInPos, true);
440 return nrDone = rInPos;
441 }
442
443 // Create header in open write-only channel
444 bool
445 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
446 {
447 int headlim = rcp->GetHeadLen() + 1024;
448 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
449 int esiz;
450 const int etyp = rcp->GetFormat(&esiz);
451
452 strcpy(hdr, HDRSTR);
453 strcat(hdr, "RADIANCE\n");
454 begData = strlen(hdr);
455 strcpy(hdr+begData, rcp->GetHeadStr());
456 begData += rcp->GetHeadLen();
457 if (omod) {
458 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
459 begData += strlen(hdr+begData);
460 }
461 if (obin >= 0) {
462 sprintf(hdr+begData, "BIN=%d\n", obin);
463 begData += strlen(hdr+begData);
464 }
465 strcpy(hdr+begData, ROWZEROSTR);
466 rowCountPos = begData+LNROWSTR;
467 begData += sizeof(ROWZEROSTR)-1;
468 if (!rcp->xres | (rowBytes > esiz)) {
469 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
470 begData += strlen(hdr+begData);
471 }
472 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
473 begData += strlen(hdr+begData);
474 if (NCSAMP > 3) {
475 sprintf(hdr+begData, "%s %g %g %g %g\n", WLSPLTSTR,
476 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
477 begData += strlen(hdr+begData);
478 }
479 if (etyp != 'c') {
480 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
481 begData += strlen(hdr+begData);
482 }
483 int align = 0;
484 switch (etyp) {
485 case 'f':
486 align = sizeof(float);
487 break;
488 case 'd':
489 align = sizeof(double);
490 break;
491 case 'c':
492 break;
493 default:
494 error(INTERNAL, "unsupported data type in NewHeader()");
495 return false;
496 }
497 strcpy(hdr+begData, FMTSTR); // write format string
498 begData += strlen(hdr+begData);
499 strcpy(hdr+begData, formstr(etyp));
500 begData += strlen(hdr+begData);
501 if (align) // align data at end of header
502 while ((begData+2) % align)
503 hdr[begData++] = ' ';
504 hdr[begData++] = '\n'; // EOL for data format
505 hdr[begData++] = '\n'; // end of nominal header
506 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
507 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
508 begData += strlen(hdr+begData);
509 }
510 return rData->ReleaseMemory(hdr, RDSwrite);
511 }
512
513 // find string argument in header for named variable
514 static const char *
515 findArgs(const char *hdr, const char *vnm, int len)
516 {
517 const char * npos = strnstr(hdr, vnm, len);
518
519 if (!npos) return NULL;
520
521 npos += strlen(vnm); // find start of (first) argument
522 len -= npos - hdr;
523 while (len-- > 0 && (*npos == '=') | isspace(*npos))
524 if (*npos++ == '\n')
525 return NULL;
526
527 return (len >= 0) ? npos : NULL;
528 }
529
530 // Load and check header in read/write channel
531 int
532 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
533 {
534 int esiz;
535 const int etyp = rcp->GetFormat(&esiz);
536 const int maxlen = rcp->GetHeadLen() + 1024;
537 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
538 const char * cp;
539 // find end of header
540 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
541 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
542 error(USER, errmsg);
543 return -1;
544 }
545 begData = cp - hdr + 1; // increment again at end
546 // check # components
547 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
548 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
549 error(USER, errmsg);
550 return -1;
551 }
552 // check format
553 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
554 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
555 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
556 error(USER, errmsg);
557 return -1;
558 }
559 // check #columns
560 if ((cp = findArgs(hdr, "NCOLS=", begData)) ? (atoi(cp)*esiz != rowBytes)
561 : (!rcp->xres | (rowBytes > esiz))) {
562 sprintf(errmsg, "expected NCOLS=%d in '%s'", int(rowBytes/esiz), GetName());
563 error(USER, errmsg);
564 return -1;
565 }
566 // find row count
567 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
568 sprintf(errmsg, "missing NROWS in '%s'", GetName());
569 error(USER, errmsg);
570 return -1;
571 }
572 rowCountPos = cp - hdr;
573 int rlast = atoi(cp);
574 begData++; // advance past closing EOL
575 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
576 char rbuf[64];
577 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
578 int rlen = strlen(rbuf);
579 if (strncmp(rbuf, hdr+begData, rlen)) {
580 sprintf(errmsg, "bad resolution string in '%s'", GetName());
581 error(USER, errmsg);
582 return -1;
583 }
584 begData += rlen;
585 }
586 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
587 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
588 }
589
590 // Rewind calculation (previous results unchanged)
591 bool
592 RcontribSimulManager::ResetRow(int r)
593 {
594 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
595 error(WARNING, "ignoring bad call to ResetRow()");
596 return (r == rInPos);
597 }
598 FlushQueue(); // finish current and reset
599 for (RcontribOutput *op = outList; op; op = op->next)
600 if (!op->SetRowsDone(r))
601 return false;
602
603 rowsDone.ClearBits(r, rInPos-r, false);
604 nrDone = rInPos = r;
605 return true;
606 }
607
608 // call-back for averaging each modifier's contributions to assigned channel(s)
609 static int
610 putModContrib(const LUENT *lp, void *p)
611 {
612 RcontribMod * mp = (RcontribMod *)lp->data;
613 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
614 const double sca = 1./rcp->accum;
615 const DCOLORV * dvp = mp->cbin;
616 RcontribOutput * op = mp->opl;
617 int i, n;
618
619 switch (rcp->GetFormat()) { // conversion based on output type
620 case 'd': {
621 double * dvo = (double *)op->InsertionP(mp->coffset);
622 for (n = mp->nbins; n--; ) {
623 for (i = 0; i < NCSAMP; i++)
624 *dvo++ = *dvp++ * sca;
625 if ((op->obin >= 0) & (n > 0))
626 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
627 }
628 } break;
629 case 'f': {
630 float * fvo = (float *)op->InsertionP(mp->coffset);
631 for (n = mp->nbins; n--; ) {
632 for (i = 0; i < NCSAMP; i++)
633 *fvo++ = float(*dvp++ * sca);
634 if ((op->obin >= 0) & (n > 0))
635 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
636 }
637 } break;
638 case 'c': {
639 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
640 for (n = mp->nbins; n--; ) {
641 SCOLOR scol;
642 for (i = 0; i < NCSAMP; i++)
643 scol[i] = COLORV(*dvp++ * sca);
644 scolor_scolr(cvo, scol);
645 cvo += LSCOLR;
646 if ((op->obin >= 0) & (n > 0))
647 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
648 }
649 } break;
650 default:
651 error(CONSISTENCY, "unsupported output type in putModContrib()");
652 return -1;
653 }
654 // clear for next tally
655 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
656 return 1;
657 }
658
659 // Add a ray/bundle to compute next record
660 int
661 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
662 {
663 if (!Ready())
664 return 0;
665 if (rInPos >= outList->nRows) {
666 error(WARNING, "ComputeRecord() called after last record");
667 return 0;
668 }
669 if (nkids > 0) { // in parent process?
670 int k = GetChild(false); // updates output rows
671 if (k < 0) return -1; // someone died?
672 RowAssignment rass;
673 rass.row = kidRow[k] = rInPos++;
674 rass.ac = accum;
675 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
676 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
677 != sizeof(FVECT)*2*accum) {
678 error(SYSTEM, "cannot write to child; dead process?");
679 return -1;
680 }
681 return accum; // tracing/output happens in child
682 }
683 if (NCSAMP != nChan) {
684 error(INTERNAL, "number of color channels must remain fixed");
685 return -1;
686 }
687 // actual work is done here...
688 if (EnqueueBundle(orig_direc, accum) < 0)
689 return -1;
690
691 RcontribOutput * op; // prepare output buffers
692 for (op = outList; op; op = op->next)
693 if (!op->GetRow(rInPos))
694 return -1;
695 // convert averages & clear
696 if (lu_doall(&modLUT, putModContrib, this) < 0)
697 return -1;
698 // write buffers
699 for (op = outList; op; op = op->next)
700 op->DoneRow();
701
702 if (!nkids) // update row if solo process
703 UpdateRowsDone(rInPos++); // XXX increment here is critical
704
705 return accum;
706 }
707
708 // Get next available child, returning index or -1 if forceWait & idling
709 int
710 RcontribSimulManager::GetChild(bool forceWait)
711 {
712 if (nkids <= 0)
713 return -1;
714 // take inventory
715 int pn, n = 0;
716 fd_set readset, errset;
717 FD_ZERO(&readset); FD_ZERO(&errset);
718 for (pn = nkids; pn--; ) {
719 if (kidRow[pn] < 0) { // child already ready?
720 if (forceWait) continue;
721 return pn; // good enough
722 }
723 FD_SET(kid[pn].r, &readset); // will check on this one
724 FD_SET(kid[pn].r, &errset);
725 if (kid[pn].r >= n)
726 n = kid[pn].r + 1;
727 }
728 if (!n) // every child is idle?
729 return -1;
730 // wait on "busy" child(ren)
731 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
732 if (errno != EINTR) {
733 error(SYSTEM, "select call failed in GetChild()");
734 return -1;
735 }
736 char buf[sizeof(ROW_DONE)] = "X";
737 pn = -1; // get flags set by select
738 for (n = nkids; n--; )
739 if (kidRow[n] >= 0 &&
740 FD_ISSET(kid[n].r, &readset) |
741 FD_ISSET(kid[n].r, &errset)) {
742 // check for error
743 if (FD_ISSET(kid[n].r, &errset) ||
744 read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
745 memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
746 return -1;
747 // update output row counts
748 UpdateRowsDone(kidRow[n]);
749 kidRow[n] = -1; // flag child available
750 pn = n;
751 }
752 return pn;
753 }
754
755 // Update row completion bitmap and update outputs (does not change rInPos)
756 bool
757 RcontribSimulManager::UpdateRowsDone(int r)
758 {
759 if (!rowsDone.TestAndSet(r)) {
760 error(WARNING, "redundant call to UpdateRowsDone()");
761 return false;
762 }
763 GetRowFinished();
764 if (nrDone <= r)
765 return true; // nothing to update, yet
766 for (RcontribOutput *op = outList; op; op = op->next)
767 if (!op->SetRowsDone(nrDone))
768 return false;
769 return true; // up-to-date
770 }
771
772 // Run rcontrib child process (never returns)
773 void
774 RcontribSimulManager::RunChild()
775 {
776 FVECT * vecList = NULL;
777 RowAssignment rass;
778 ssize_t nr;
779
780 accum = 0;
781 errno = 0;
782 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
783 if (!rass.ac) {
784 error(CONSISTENCY, "bad accumulator count in child");
785 exit(1);
786 }
787 if (rass.ac > accum) {
788 efree(vecList);
789 vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
790 }
791 accum = rass.ac;
792 rInPos = rass.row;
793
794 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
795 sizeof(FVECT)*2*accum)
796 break;
797
798 if (ComputeRecord(vecList) <= 0)
799 exit(1);
800 // signal this row is done
801 if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
802 exit(1);
803 }
804 if (nr) {
805 error(SYSTEM, "read error in child process");
806 exit(1);
807 }
808 exit(0);
809 }
810
811 // Add child processes as indicated
812 bool
813 RcontribSimulManager::StartKids(int n2go)
814 {
815 if ((n2go <= 0) | (nkids + n2go <= 1))
816 return false;
817
818 if (!nkids)
819 cow_memshare(); // preload objects
820
821 n2go += nkids; // => desired brood size
822 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
823 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
824
825 fflush(stdout); // shouldn't use, anyway
826 while (nkids < n2go) {
827 kid[nkids] = sp_inactive;
828 int rv = open_process(&kid[nkids], NULL);
829 if (!rv) { // in child process?
830 while (nkids-- > 0) {
831 close(kid[nkids].r);
832 close(kid[nkids].w);
833 }
834 free(kid); free(kidRow);
835 kid = NULL; kidRow = NULL;
836 RunChild(); // should never return
837 _exit(1);
838 }
839 if (rv < 0) {
840 error(SYSTEM, "cannot fork worker process");
841 return false;
842 }
843 kidRow[nkids++] = -1; // newborn is ready
844 }
845 return true;
846 }
847
848 // Reap the indicated number of children (all if 0)
849 int
850 RcontribSimulManager::StopKids(int n2end)
851 {
852 if (nkids <= 0)
853 return 0;
854 FlushQueue();
855 int status = 0;
856 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
857 status = close_processes(kid, nkids);
858 free(kid); free(kidRow);
859 kid = NULL; kidRow = NULL;
860 nkids = 0;
861 } else { // else shrink family
862 int st;
863 while (n2end-- > 0)
864 if ((st = close_process(&kid[--nkids])) > 0)
865 status = st;
866 // could call realloc(), but it hardly matters
867 }
868 if (status) {
869 sprintf(errmsg, "non-zero (%d) status from child", status);
870 error(WARNING, errmsg);
871 }
872 return status;
873 }
874
875 // Set number of computation threads (0 => #cores)
876 int
877 RcontribSimulManager::SetThreadCount(int nt)
878 {
879 if (!Ready()) {
880 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
881 return 0;
882 }
883 if (nt < 0)
884 return NThreads();
885 if (!nt) nt = GetNCores();
886 int status = 0;
887 if (nt == 1)
888 status = StopKids();
889 else if (nt < nkids)
890 status = StopKids(nkids-nt);
891 else if (nt > nkids)
892 StartKids(nt-nkids);
893 if (status) {
894 sprintf(errmsg, "non-zero (%d) status from child", status);
895 error(WARNING, errmsg);
896 }
897 return NThreads();
898 }