ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.23
Committed: Tue Nov 11 02:08:52 2025 UTC (4 days, 17 hours ago) by greg
Branch: MAIN
Changes since 2.22: +12 -5 lines
Log Message:
perf(rcontrib,rxfluxmtx,rxcontrib,rxfluxmtx): Better preemptive check

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.22 2025/11/11 00:19:10 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 static const char ROW_DONE[] = "ROW FINISHED\n";
56
57 // allocate rcontrib accumulator
58 static RcontribMod *
59 NewRcMod(const char *prms, const char *binexpr, int ncbins)
60 {
61 if (binexpr && !*binexpr) binexpr = NULL;
62 if (!prms) prms = "";
63 if ((ncbins > 1) & !binexpr) {
64 error(USER, "missing bin expression");
65 return NULL;
66 }
67 if (ncbins < 1) ncbins = 1;
68
69 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
70 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
71 strlen(prms)+1);
72
73 if (binexpr) { // get/check bin expression
74 mp->binv = eparse(const_cast<char *>(binexpr));
75 if (mp->binv->type == NUM) { // constant expression (0)?
76 if ((int)evalue(mp->binv) != 0) {
77 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
78 binexpr);
79 error(USER, errmsg);
80 }
81 if (ncbins > 1) {
82 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
83 error(USER, errmsg);
84 }
85 epfree(mp->binv, true);
86 mp->binv = NULL;
87 prms = "";
88 ncbins = 1;
89 }
90 }
91 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
92 mp->nbins = ncbins;
93 return mp;
94 }
95
96 // Free an RcontribMod (public for RcontribSimulManager constructor)
97 void
98 FreeRcMod(void *p)
99 {
100 if (!p) return;
101 EPNODE * bep = (*(RcontribMod *)p).binv;
102 if (bep) epfree(bep, true);
103 efree(p);
104 }
105
106 // Get format identifier
107 const char *
108 formstr(int f)
109 {
110 switch (f) {
111 case 'a': return("ascii");
112 case 'f': return("float");
113 case 'd': return("double");
114 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
115 }
116 return("unknown");
117 }
118
119 // Standard file data share function
120 RdataShare *
121 fileDataShare(const char *name, RCOutputOp op, size_t siz)
122 {
123 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
124 sprintf(errmsg, "cannot recover from '%s'", name);
125 error(SYSTEM, errmsg);
126 return NULL;
127 }
128 RdataShare * rds = new RdataShareFile(name, RSDOflags[op],
129 siz*(op != RCOforce));
130
131 if (!rds || (op == RCOforce && rds->Resize(siz) < siz)) {
132 delete rds;
133 sprintf(errmsg, "cannot create %lu byte output file '%s'",
134 (unsigned long)siz, name);
135 error(SYSTEM, errmsg);
136 return NULL;
137 }
138 return rds;
139 }
140
141 // Memory-mapped data share function
142 RdataShare *
143 mapDataShare(const char *name, RCOutputOp op, size_t siz)
144 {
145 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
146 sprintf(errmsg, "cannot recover from '%s'", name);
147 error(SYSTEM, errmsg);
148 return NULL;
149 }
150 RdataShare * rds = new RdataShareMap(name, RSDOflags[op],
151 siz*(op != RCOforce));
152
153 if (!rds || (op == RCOforce && rds->Resize(siz) < siz)) {
154 delete rds;
155 sprintf(errmsg, "cannot create %lu byte output map '%s'",
156 (unsigned long)siz, name);
157 error(SYSTEM, errmsg);
158 return NULL;
159 }
160 return rds;
161 }
162
163 // Set output format ('f', 'd', or 'c')
164 bool
165 RcontribSimulManager::SetDataFormat(int ty)
166 {
167 if (outList) {
168 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
169 return false;
170 }
171 switch (ty) {
172 case 'f':
173 dsiz = sizeof(float)*NCSAMP;
174 break;
175 case 'd':
176 dsiz = sizeof(double)*NCSAMP;
177 break;
178 case 'c':
179 dsiz = LSCOLR;
180 break;
181 default:
182 sprintf(errmsg, "unsupported output format '%c'", ty);
183 error(INTERNAL, errmsg);
184 return false;
185 }
186 dtyp = ty;
187 return true;
188 }
189
190 // static call-back for rcontrib ray-tracing
191 int
192 RcontribSimulManager::RctCall(RAY *r, void *cd)
193 {
194 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
195 return 0;
196 // shadow ray not on source?
197 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
198 return 0;
199
200 const char * mname = objptr(r->ro->omod)->oname;
201 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
202 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
203 if (!mp)
204 return 0; // not in our modifier list
205
206 int i; // pre-emptive check for zero
207 if (rcp->HasFlag(RCcontrib)) {
208 for (i = NCSAMP; i--; )
209 if (r->rcoef[i]*r->rcol[i] > FTINY)
210 break;
211 if (i < 0)
212 return 0; // zero contribution
213 } else if (sintens(r->rcoef) <= FTINY)
214 return 0; // zero coefficient
215
216 int bi = 0; // get bin index
217 if (mp->binv) {
218 worldfunc(RCCONTEXT, r);
219 set_eparams(mp->params);
220 double bval = evalue(mp->binv);
221 if (bval <= -.5)
222 return 0; // silently ignore negative bin index
223 bi = int(bval + .5);
224 }
225 DCOLORV * dvp = (*mp)[bi];
226 if (!dvp) {
227 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
228 error(WARNING, errmsg);
229 return 0;
230 }
231 SCOLOR contr;
232 raycontrib(contr, r, PRIMARY); // compute coefficient
233 if (rcp->HasFlag(RCcontrib))
234 smultscolor(contr, r->rcol); // -> value contribution
235
236 for (i = 0; i < NCSAMP; i++)
237 *dvp++ += contr[i]; // accumulate color/spectrum
238 return 1;
239 }
240
241 // check for given format type in string, return last char position
242 static int
243 hasFormat(const char *tst, const char *match)
244 {
245 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
246 const char * s = tst;
247 while (*s) {
248 if (*s++ != '%')
249 continue;
250 while (!strchr(allPoss, *s))
251 s++;
252 if (strchr(match, *s))
253 break;
254 s++;
255 }
256 return (*s != '\0')*(s - tst);
257 }
258
259 // Add a modifier and arguments, opening output file(s)
260 bool
261 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
262 const char *prms, const char *binval, int bincnt)
263 {
264 if (!modn | !outspec || !*modn | !*outspec) {
265 error(WARNING, "ignoring bad call to AddModifier()");
266 return false;
267 }
268 if (*outspec == '!') {
269 error(USER, "command output not supported by RcontribSimulManager");
270 return false;
271 }
272 if (!nChan) { // initial call?
273 if ((xres < 0) | (yres <= 0)) {
274 error(USER, "xres, yres must be set before first modifier");
275 return false;
276 }
277 if (!SetDataFormat(dtyp))
278 return false;
279 nChan = NCSAMP;
280 } else if (nChan != NCSAMP) {
281 error(USER, "# spectral channels must be fixed in AddModifier()");
282 return false;
283 }
284 if (Ready()) {
285 error(INTERNAL, "call to AddModifier() after PrepOutput()");
286 return false;
287 }
288 LUENT * lp = lu_find(&modLUT, modn);
289 if (lp->data) {
290 sprintf(errmsg, "duplicate modifier '%s'", modn);
291 error(USER, errmsg);
292 return false;
293 }
294 if (!lp->key) // create new entry
295 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
296
297 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
298 if (!mp) return false;
299 lp->data = (char *)mp;
300 const int modndx = hasFormat(outspec, "sb");
301 const int binndx = hasFormat(outspec, "diouxX");
302 int bin0 = 0;
303 char fnbuf[512];
304 if (!modndx || (binndx > 0) & (modndx > binndx))
305 sprintf(fnbuf, outspec, bin0, modn);
306 else
307 sprintf(fnbuf, outspec, modn, bin0);
308 RcontribOutput * olast = NULL;
309 RcontribOutput * op;
310 for (op = outList; op; op = (olast=op)->next) {
311 if (strcmp(op->GetName(), fnbuf))
312 continue;
313 if (modndx) { // this ain't right
314 sprintf(errmsg, "output name collision for '%s'", fnbuf);
315 error(USER, errmsg);
316 return false;
317 }
318 if ((binndx > 0) & (xres > 0)) {
319 sprintf(fnbuf, outspec, ++bin0);
320 continue; // each bin goes to another image
321 }
322 mp->coffset = op->rowBytes; // else add to what's there
323 break;
324 }
325 if (!op) { // create new output channel?
326 op = new RcontribOutput(fnbuf);
327 if (olast) olast->next = op;
328 else outList = op;
329 }
330 mp->opl = op; // first (maybe only) output channel
331 if (modndx) // remember modifier if part of name
332 op->omod = lp->key;
333 if (binndx) { // append output image/bin list
334 op->rowBytes += dsiz;
335 op->obin = bin0;
336 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
337 if (!modndx || (binndx > 0) & (modndx > binndx))
338 sprintf(fnbuf, outspec, bin0+bincnt, modn);
339 else
340 sprintf(fnbuf, outspec, modn, bin0+bincnt);
341 if (!op->next) {
342 olast = op;
343 olast->next = op = new RcontribOutput(fnbuf);
344 if (modndx) op->omod = lp->key;
345 op->obin = bin0+bincnt;
346 } else {
347 op = op->next;
348 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
349 "bin number mismatch in AddModifier()");
350 }
351 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
352 "row offset mismatch in AddModifier()");
353 op->rowBytes += dsiz;
354 }
355 } else // else send all results to this channel
356 op->rowBytes += mp->nbins*dsiz;
357 return true;
358 }
359
360 // Add a file of modifiers with associated arguments
361 bool
362 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
363 const char *prms,
364 const char *binval, int bincnt)
365 {
366 char * path = getpath(const_cast<char *>(modfn),
367 getrlibpath(), R_OK);
368 FILE * fp;
369 if (!path || !(fp = fopen(path, "r"))) {
370 sprintf(errmsg, "cannot %s modifier file '%s'",
371 path ? "open" : "find", modfn);
372 error(SYSTEM, errmsg);
373 return false;
374 }
375 char mod[MAXSTR];
376 while (fgetword(mod, sizeof(mod), fp))
377 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
378 fclose(fp);
379 return false;
380 }
381 fclose(fp);
382 return true;
383 }
384
385 // call-back to check if modifier has been loaded
386 static int
387 checkModExists(const LUENT *lp, void *p)
388 {
389 OBJECT mod = modifier(lp->key);
390
391 if ((mod != OVOID) & (mod < nsceneobjs))
392 return 1;
393
394 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
395 error(WARNING, errmsg);
396 return 0;
397 }
398
399 // Prepare output channels and return # completed rows
400 int
401 RcontribSimulManager::PrepOutput()
402 {
403 if (!outList || !RtraceSimulManager::Ready()) {
404 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
405 return -1;
406 }
407 if (!cdsF) {
408 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
409 return -1;
410 }
411 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
412 return -1;
413
414 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
415 int remWarnings = 20;
416 for (RcontribOutput *op = outList; op; op = op->next) {
417 if (op->rData) {
418 error(INTERNAL, "output channel already open in PrepOutput()");
419 return -1;
420 }
421 op->nRows = outList->nRows;
422 op->rData = (*cdsF)(op->ofname, outOp,
423 GetHeadLen()+1024 + op->nRows*op->rowBytes);
424 freeqstr(op->ofname); op->ofname = NULL;
425 if (outOp == RCOrecover) {
426 int rd = op->CheckHeader(this);
427 if (rd < 0)
428 return -1;
429 if (rd >= op->nRows) {
430 if (remWarnings > 0) {
431 sprintf(errmsg, "recovered output '%s' is complete",
432 op->GetName());
433 error(WARNING, --remWarnings ? errmsg : "etc...");
434 }
435 rd = op->nRows;
436 }
437 if (!rInPos | (rInPos > rd)) rInPos = rd;
438 } else if (!op->NewHeader(this))
439 return -1;
440 // make sure there's room
441 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
442 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
443 return -1; // calls error() for us
444 }
445 rowsDone.NewBitMap(outList->nRows); // create row completion map
446 rowsDone.ClearBits(0, rInPos, true);
447 return nrDone = rInPos;
448 }
449
450 // Create header in open write-only channel
451 bool
452 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
453 {
454 int headlim = rcp->GetHeadLen() + 1024;
455 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
456 int esiz;
457 const int etyp = rcp->GetFormat(&esiz);
458
459 strcpy(hdr, HDRSTR);
460 strcat(hdr, "RADIANCE\n");
461 begData = strlen(hdr);
462 strcpy(hdr+begData, rcp->GetHeadStr());
463 begData += rcp->GetHeadLen();
464 if (omod) {
465 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
466 begData += strlen(hdr+begData);
467 }
468 if (obin >= 0) {
469 sprintf(hdr+begData, "BIN=%d\n", obin);
470 begData += strlen(hdr+begData);
471 }
472 strcpy(hdr+begData, ROWZEROSTR);
473 rowCountPos = begData+LNROWSTR;
474 begData += sizeof(ROWZEROSTR)-1;
475 if (!rcp->xres | (rowBytes > esiz)) {
476 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
477 begData += strlen(hdr+begData);
478 }
479 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
480 begData += strlen(hdr+begData);
481 if (NCSAMP > 3) {
482 sprintf(hdr+begData, "%s %g %g %g %g\n", WLSPLTSTR,
483 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
484 begData += strlen(hdr+begData);
485 }
486 if (etyp != 'c') {
487 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
488 begData += strlen(hdr+begData);
489 }
490 int align = 0;
491 switch (etyp) {
492 case 'f':
493 align = sizeof(float);
494 break;
495 case 'd':
496 align = sizeof(double);
497 break;
498 case 'c':
499 break;
500 default:
501 error(INTERNAL, "unsupported data type in NewHeader()");
502 return false;
503 }
504 strcpy(hdr+begData, FMTSTR); // write format string
505 begData += strlen(hdr+begData);
506 strcpy(hdr+begData, formstr(etyp));
507 begData += strlen(hdr+begData);
508 if (align) // align data at end of header
509 while ((begData+2) % align)
510 hdr[begData++] = ' ';
511 hdr[begData++] = '\n'; // EOL for data format
512 hdr[begData++] = '\n'; // end of nominal header
513 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
514 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
515 begData += strlen(hdr+begData);
516 }
517 return rData->ReleaseMemory(hdr, RDSwrite);
518 }
519
520 // find string argument in header for named variable
521 static const char *
522 findArgs(const char *hdr, const char *vnm, int len)
523 {
524 const char * npos = strnstr(hdr, vnm, len);
525
526 if (!npos) return NULL;
527
528 npos += strlen(vnm); // find start of (first) argument
529 len -= npos - hdr;
530 while (len-- > 0 && (*npos == '=') | isspace(*npos))
531 if (*npos++ == '\n')
532 return NULL;
533
534 return (len >= 0) ? npos : NULL;
535 }
536
537 // Load and check header in read/write channel
538 int
539 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
540 {
541 int esiz;
542 const int etyp = rcp->GetFormat(&esiz);
543 const int maxlen = rcp->GetHeadLen() + 1024;
544 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
545 const char * cp;
546 // find end of header
547 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
548 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
549 error(USER, errmsg);
550 return -1;
551 }
552 begData = cp - hdr + 1; // increment again at end
553 // check # components
554 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
555 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
556 error(USER, errmsg);
557 return -1;
558 }
559 // check format
560 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
561 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
562 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
563 error(USER, errmsg);
564 return -1;
565 }
566 // check #columns
567 if ((cp = findArgs(hdr, "NCOLS=", begData)) ? (atoi(cp)*esiz != rowBytes)
568 : (!rcp->xres | (rowBytes > esiz))) {
569 sprintf(errmsg, "expected NCOLS=%d in '%s'", int(rowBytes/esiz), GetName());
570 error(USER, errmsg);
571 return -1;
572 }
573 // find row count
574 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
575 sprintf(errmsg, "missing NROWS in '%s'", GetName());
576 error(USER, errmsg);
577 return -1;
578 }
579 rowCountPos = cp - hdr;
580 int rlast = atoi(cp);
581 begData++; // advance past closing EOL
582 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
583 char rbuf[64];
584 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
585 int rlen = strlen(rbuf);
586 if (strncmp(rbuf, hdr+begData, rlen)) {
587 sprintf(errmsg, "bad resolution string in '%s'", GetName());
588 error(USER, errmsg);
589 return -1;
590 }
591 begData += rlen;
592 }
593 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
594 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
595 }
596
597 // Rewind calculation (previous results unchanged)
598 bool
599 RcontribSimulManager::ResetRow(int r)
600 {
601 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
602 error(WARNING, "ignoring bad call to ResetRow()");
603 return (r == rInPos);
604 }
605 FlushQueue(); // finish current and reset
606 for (RcontribOutput *op = outList; op; op = op->next)
607 if (!op->SetRowsDone(r))
608 return false;
609
610 rowsDone.ClearBits(r, rInPos-r, false);
611 nrDone = rInPos = r;
612 return true;
613 }
614
615 // call-back for averaging each modifier's contributions to assigned channel(s)
616 static int
617 putModContrib(const LUENT *lp, void *p)
618 {
619 RcontribMod * mp = (RcontribMod *)lp->data;
620 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
621 const double sca = 1./rcp->accum;
622 const DCOLORV * dvp = mp->cbin;
623 RcontribOutput * op = mp->opl;
624 int i, n;
625
626 switch (rcp->GetFormat()) { // conversion based on output type
627 case 'd': {
628 double * dvo = (double *)op->InsertionP(mp->coffset);
629 for (n = mp->nbins; n--; ) {
630 for (i = 0; i < NCSAMP; i++)
631 *dvo++ = *dvp++ * sca;
632 if ((op->obin >= 0) & (n > 0))
633 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
634 }
635 } break;
636 case 'f': {
637 float * fvo = (float *)op->InsertionP(mp->coffset);
638 for (n = mp->nbins; n--; ) {
639 for (i = 0; i < NCSAMP; i++)
640 *fvo++ = float(*dvp++ * sca);
641 if ((op->obin >= 0) & (n > 0))
642 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
643 }
644 } break;
645 case 'c': {
646 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
647 for (n = mp->nbins; n--; ) {
648 SCOLOR scol;
649 for (i = 0; i < NCSAMP; i++)
650 scol[i] = COLORV(*dvp++ * sca);
651 scolor_scolr(cvo, scol);
652 cvo += LSCOLR;
653 if ((op->obin >= 0) & (n > 0))
654 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
655 }
656 } break;
657 default:
658 error(CONSISTENCY, "unsupported output type in putModContrib()");
659 return -1;
660 }
661 // clear for next tally
662 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
663 return 1;
664 }
665
666 // Add a ray/bundle to compute next record
667 int
668 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
669 {
670 if (!Ready())
671 return 0;
672 if (rInPos >= outList->nRows) {
673 error(WARNING, "ComputeRecord() called after last record");
674 return 0;
675 }
676 if (nkids > 0) { // in parent process?
677 int k = GetChild(false); // updates output rows
678 if (k < 0) return -1; // someone died?
679 RowAssignment rass;
680 rass.row = kidRow[k] = rInPos++;
681 rass.ac = accum;
682 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
683 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
684 != sizeof(FVECT)*2*accum) {
685 error(SYSTEM, "cannot write to child; dead process?");
686 return -1;
687 }
688 return accum; // tracing/output happens in child
689 }
690 if (NCSAMP != nChan) {
691 error(INTERNAL, "number of color channels must remain fixed");
692 return -1;
693 }
694 // actual work is done here...
695 if (EnqueueBundle(orig_direc, accum) < 0)
696 return -1;
697
698 RcontribOutput * op; // prepare output buffers
699 for (op = outList; op; op = op->next)
700 if (!op->GetRow(rInPos))
701 return -1;
702 // convert averages & clear
703 if (lu_doall(&modLUT, putModContrib, this) < 0)
704 return -1;
705 // write buffers
706 for (op = outList; op; op = op->next)
707 op->DoneRow();
708
709 if (!nkids) // update row if solo process
710 UpdateRowsDone(rInPos++); // XXX increment here is critical
711
712 return accum;
713 }
714
715 // Get next available child, returning index or -1 if forceWait & idling
716 int
717 RcontribSimulManager::GetChild(bool forceWait)
718 {
719 if (nkids <= 0)
720 return -1;
721 // take inventory
722 int pn, n = 0;
723 fd_set readset, errset;
724 FD_ZERO(&readset); FD_ZERO(&errset);
725 for (pn = nkids; pn--; ) {
726 if (kidRow[pn] < 0) { // child already ready?
727 if (forceWait) continue;
728 return pn; // good enough
729 }
730 FD_SET(kid[pn].r, &readset); // will check on this one
731 FD_SET(kid[pn].r, &errset);
732 if (kid[pn].r >= n)
733 n = kid[pn].r + 1;
734 }
735 if (!n) // every child is idle?
736 return -1;
737 // wait on "busy" child(ren)
738 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
739 if (errno != EINTR) {
740 error(SYSTEM, "select call failed in GetChild()");
741 return -1;
742 }
743 char buf[sizeof(ROW_DONE)] = "X";
744 pn = -1; // get flags set by select
745 for (n = nkids; n--; )
746 if (kidRow[n] >= 0 &&
747 FD_ISSET(kid[n].r, &readset) |
748 FD_ISSET(kid[n].r, &errset)) {
749 // check for error
750 if (FD_ISSET(kid[n].r, &errset) ||
751 read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
752 memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
753 return -1;
754 // update output row counts
755 UpdateRowsDone(kidRow[n]);
756 kidRow[n] = -1; // flag child available
757 pn = n;
758 }
759 return pn;
760 }
761
762 // Update row completion bitmap and update outputs (does not change rInPos)
763 bool
764 RcontribSimulManager::UpdateRowsDone(int r)
765 {
766 if (!rowsDone.TestAndSet(r)) {
767 error(WARNING, "redundant call to UpdateRowsDone()");
768 return false;
769 }
770 GetRowFinished();
771 if (nrDone <= r)
772 return true; // nothing to update, yet
773 for (RcontribOutput *op = outList; op; op = op->next)
774 if (!op->SetRowsDone(nrDone))
775 return false;
776 return true; // up-to-date
777 }
778
779 // Run rcontrib child process (never returns)
780 void
781 RcontribSimulManager::RunChild()
782 {
783 FVECT * vecList = NULL;
784 RowAssignment rass;
785 ssize_t nr;
786
787 accum = 0;
788 errno = 0;
789 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
790 if (!rass.ac) {
791 error(CONSISTENCY, "bad accumulator count in child");
792 exit(1);
793 }
794 if (rass.ac > accum) {
795 efree(vecList);
796 vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
797 }
798 accum = rass.ac;
799 rInPos = rass.row;
800
801 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
802 sizeof(FVECT)*2*accum)
803 break;
804
805 if (ComputeRecord(vecList) <= 0)
806 exit(1);
807 // signal this row is done
808 if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
809 exit(1);
810 }
811 if (nr) {
812 error(SYSTEM, "read error in child process");
813 exit(1);
814 }
815 exit(0);
816 }
817
818 // Add child processes as indicated
819 bool
820 RcontribSimulManager::StartKids(int n2go)
821 {
822 if ((n2go <= 0) | (nkids + n2go <= 1))
823 return false;
824
825 if (!nkids)
826 cow_memshare(); // preload objects
827
828 n2go += nkids; // => desired brood size
829 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
830 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
831
832 fflush(stdout); // shouldn't use, anyway
833 while (nkids < n2go) {
834 kid[nkids] = sp_inactive;
835 int rv = open_process(&kid[nkids], NULL);
836 if (!rv) { // in child process?
837 while (nkids-- > 0) {
838 close(kid[nkids].r);
839 close(kid[nkids].w);
840 }
841 free(kid); free(kidRow);
842 kid = NULL; kidRow = NULL;
843 RunChild(); // should never return
844 _exit(1);
845 }
846 if (rv < 0) {
847 error(SYSTEM, "cannot fork worker process");
848 return false;
849 }
850 kidRow[nkids++] = -1; // newborn is ready
851 }
852 return true;
853 }
854
855 // Reap the indicated number of children (all if 0)
856 int
857 RcontribSimulManager::StopKids(int n2end)
858 {
859 if (nkids <= 0)
860 return 0;
861 FlushQueue();
862 int status = 0;
863 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
864 status = close_processes(kid, nkids);
865 free(kid); free(kidRow);
866 kid = NULL; kidRow = NULL;
867 nkids = 0;
868 } else { // else shrink family
869 int st;
870 while (n2end-- > 0)
871 if ((st = close_process(&kid[--nkids])) > 0)
872 status = st;
873 // could call realloc(), but it hardly matters
874 }
875 if (status) {
876 sprintf(errmsg, "non-zero (%d) status from child", status);
877 error(WARNING, errmsg);
878 }
879 return status;
880 }
881
882 // Set number of computation threads (0 => #cores)
883 int
884 RcontribSimulManager::SetThreadCount(int nt)
885 {
886 if (!Ready()) {
887 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
888 return 0;
889 }
890 if (nt < 0)
891 return NThreads();
892 if (!nt) nt = GetNCores();
893 int status = 0;
894 if (nt == 1)
895 status = StopKids();
896 else if (nt < nkids)
897 status = StopKids(nkids-nt);
898 else if (nt > nkids)
899 StartKids(nt-nkids);
900 if (status) {
901 sprintf(errmsg, "non-zero (%d) status from child", status);
902 error(WARNING, errmsg);
903 }
904 return NThreads();
905 }