ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.19
Committed: Sat Oct 25 03:30:35 2025 UTC (7 days, 2 hours ago) by greg
Branch: MAIN
Changes since 2.18: +9 -10 lines
Log Message:
fix(rxcontrib,rxfluxmtx): Repaired logic with NCOLS= and -r + speed improvement

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.18 2025/10/24 18:37:05 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 static const char ROW_DONE[] = "ROW FINISHED\n";
56
57 // allocate rcontrib accumulator
58 static RcontribMod *
59 NewRcMod(const char *prms, const char *binexpr, int ncbins)
60 {
61 if (binexpr && !*binexpr) binexpr = NULL;
62 if (!prms) prms = "";
63 if ((ncbins > 1) & !binexpr) {
64 error(USER, "missing bin expression");
65 return NULL;
66 }
67 if (ncbins < 1) ncbins = 1;
68
69 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
70 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
71 strlen(prms)+1);
72
73 if (binexpr) { // get/check bin expression
74 mp->binv = eparse(const_cast<char *>(binexpr));
75 if (mp->binv->type == NUM) { // constant expression (0)?
76 if ((int)evalue(mp->binv) != 0) {
77 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
78 binexpr);
79 error(USER, errmsg);
80 }
81 if (ncbins > 1) {
82 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
83 error(USER, errmsg);
84 }
85 epfree(mp->binv, true);
86 mp->binv = NULL;
87 prms = "";
88 ncbins = 1;
89 }
90 }
91 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
92 mp->nbins = ncbins;
93 return mp;
94 }
95
96 // Free an RcontribMod (public for RcontribSimulManager constructor)
97 void
98 FreeRcMod(void *p)
99 {
100 if (!p) return;
101 EPNODE * bep = (*(RcontribMod *)p).binv;
102 if (bep) epfree(bep, true);
103 efree(p);
104 }
105
106 // Get format identifier
107 const char *
108 formstr(int f)
109 {
110 switch (f) {
111 case 'a': return("ascii");
112 case 'f': return("float");
113 case 'd': return("double");
114 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
115 }
116 return("unknown");
117 }
118
119 // Standard file data share function
120 RdataShare *
121 fileDataShare(const char *name, RCOutputOp op, size_t siz)
122 {
123 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
124 sprintf(errmsg, "cannot recover from '%s'", name);
125 error(SYSTEM, errmsg);
126 return NULL;
127 }
128 return new RdataShareFile(name, RSDOflags[op], siz);
129 }
130
131 // Memory-mapped data share function
132 RdataShare *
133 mapDataShare(const char *name, RCOutputOp op, size_t siz)
134 {
135 if (op == RCOrecover && access(name, R_OK|W_OK) < 0) {
136 sprintf(errmsg, "cannot recover from '%s'", name);
137 error(SYSTEM, errmsg);
138 return NULL;
139 }
140 return new RdataShareMap(name, RSDOflags[op], siz);
141 }
142
143 // Set output format ('f', 'd', or 'c')
144 bool
145 RcontribSimulManager::SetDataFormat(int ty)
146 {
147 if (outList) {
148 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
149 return false;
150 }
151 switch (ty) {
152 case 'f':
153 dsiz = sizeof(float)*NCSAMP;
154 break;
155 case 'd':
156 dsiz = sizeof(double)*NCSAMP;
157 break;
158 case 'c':
159 dsiz = LSCOLR;
160 break;
161 default:
162 sprintf(errmsg, "unsupported output format '%c'", ty);
163 error(INTERNAL, errmsg);
164 return false;
165 }
166 dtyp = ty;
167 return true;
168 }
169
170 // static call-back for rcontrib ray-tracing
171 int
172 RcontribSimulManager::RctCall(RAY *r, void *cd)
173 {
174 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
175 return 0;
176 // shadow ray not on source?
177 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
178 return 0;
179
180 const char * mname = objptr(r->ro->omod)->oname;
181 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
182 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
183 if (!mp)
184 return 0; // not in our modifier list
185
186 int bi = 0; // get bin index
187 if (mp->binv) {
188 worldfunc(RCCONTEXT, r);
189 set_eparams(mp->params);
190 double bval = evalue(mp->binv);
191 if (bval <= -.5)
192 return 0; // silently ignore negative bin index
193 bi = int(bval + .5);
194 }
195 DCOLORV * dvp = (*mp)[bi];
196 if (!dvp) {
197 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
198 error(WARNING, errmsg);
199 return 0;
200 }
201 SCOLOR contr;
202 raycontrib(contr, r, PRIMARY); // compute coefficient
203 if (rcp->HasFlag(RCcontrib))
204 smultscolor(contr, r->rcol); // -> value contribution
205
206 for (int i = 0; i < NCSAMP; i++)
207 *dvp++ += contr[i]; // accumulate color/spectrum
208 return 1;
209 }
210
211 // check for given format type in string, return last char position
212 static int
213 hasFormat(const char *tst, const char *match)
214 {
215 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
216 const char * s = tst;
217 while (*s) {
218 if (*s++ != '%')
219 continue;
220 while (!strchr(allPoss, *s))
221 s++;
222 if (strchr(match, *s))
223 break;
224 s++;
225 }
226 return (*s != '\0')*(s - tst);
227 }
228
229 // Add a modifier and arguments, opening output file(s)
230 bool
231 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
232 const char *prms, const char *binval, int bincnt)
233 {
234 if (!modn | !outspec || !*modn | !*outspec) {
235 error(WARNING, "ignoring bad call to AddModifier()");
236 return false;
237 }
238 if (*outspec == '!') {
239 error(USER, "command output not supported by RcontribSimulManager");
240 return false;
241 }
242 if (!nChan) { // initial call?
243 if ((xres < 0) | (yres <= 0)) {
244 error(USER, "xres, yres must be set before first modifier");
245 return false;
246 }
247 if (!SetDataFormat(dtyp))
248 return false;
249 nChan = NCSAMP;
250 } else if (nChan != NCSAMP) {
251 error(USER, "# spectral channels must be fixed in AddModifier()");
252 return false;
253 }
254 if (Ready()) {
255 error(INTERNAL, "call to AddModifier() after PrepOutput()");
256 return false;
257 }
258 LUENT * lp = lu_find(&modLUT, modn);
259 if (lp->data) {
260 sprintf(errmsg, "duplicate modifier '%s'", modn);
261 error(USER, errmsg);
262 return false;
263 }
264 if (!lp->key) // create new entry
265 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
266
267 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
268 if (!mp) return false;
269 lp->data = (char *)mp;
270 const int modndx = hasFormat(outspec, "sb");
271 const int binndx = hasFormat(outspec, "diouxX");
272 int bin0 = 0;
273 char fnbuf[512];
274 if (!modndx || (binndx > 0) & (modndx > binndx))
275 sprintf(fnbuf, outspec, bin0, modn);
276 else
277 sprintf(fnbuf, outspec, modn, bin0);
278 RcontribOutput * olast = NULL;
279 RcontribOutput * op;
280 for (op = outList; op; op = (olast=op)->next) {
281 if (strcmp(op->GetName(), fnbuf))
282 continue;
283 if (modndx) { // this ain't right
284 sprintf(errmsg, "output name collision for '%s'", fnbuf);
285 error(USER, errmsg);
286 return false;
287 }
288 if ((binndx > 0) & (xres > 0)) {
289 sprintf(fnbuf, outspec, ++bin0);
290 continue; // each bin goes to another image
291 }
292 mp->coffset = op->rowBytes; // else add to what's there
293 break;
294 }
295 if (!op) { // create new output channel?
296 op = new RcontribOutput(fnbuf);
297 if (olast) olast->next = op;
298 else outList = op;
299 }
300 mp->opl = op; // first (maybe only) output channel
301 if (modndx) // remember modifier if part of name
302 op->omod = lp->key;
303 if (binndx) { // append output image/bin list
304 op->rowBytes += dsiz;
305 op->obin = bin0;
306 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
307 if (!modndx || (binndx > 0) & (modndx > binndx))
308 sprintf(fnbuf, outspec, bin0+bincnt, modn);
309 else
310 sprintf(fnbuf, outspec, modn, bin0+bincnt);
311 if (!op->next) {
312 olast = op;
313 olast->next = op = new RcontribOutput(fnbuf);
314 if (modndx) op->omod = lp->key;
315 op->obin = bin0+bincnt;
316 } else {
317 op = op->next;
318 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
319 "bin number mismatch in AddModifier()");
320 }
321 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
322 "row offset mismatch in AddModifier()");
323 op->rowBytes += dsiz;
324 }
325 } else // else send all results to this channel
326 op->rowBytes += mp->nbins*dsiz;
327 return true;
328 }
329
330 // Add a file of modifiers with associated arguments
331 bool
332 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
333 const char *prms,
334 const char *binval, int bincnt)
335 {
336 char * path = getpath(const_cast<char *>(modfn),
337 getrlibpath(), R_OK);
338 FILE * fp;
339 if (!path || !(fp = fopen(path, "r"))) {
340 sprintf(errmsg, "cannot %s modifier file '%s'",
341 path ? "open" : "find", modfn);
342 error(SYSTEM, errmsg);
343 return false;
344 }
345 char mod[MAXSTR];
346 while (fgetword(mod, sizeof(mod), fp))
347 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
348 fclose(fp);
349 return false;
350 }
351 fclose(fp);
352 return true;
353 }
354
355 // call-back to check if modifier has been loaded
356 static int
357 checkModExists(const LUENT *lp, void *p)
358 {
359 OBJECT mod = modifier(lp->key);
360
361 if ((mod != OVOID) & (mod < nsceneobjs))
362 return 1;
363
364 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
365 error(WARNING, errmsg);
366 return 0;
367 }
368
369 // Prepare output channels and return # completed rows
370 int
371 RcontribSimulManager::PrepOutput()
372 {
373 if (!outList || !RtraceSimulManager::Ready()) {
374 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
375 return -1;
376 }
377 if (!cdsF) {
378 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
379 return -1;
380 }
381 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
382 return -1;
383
384 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
385 int remWarnings = 20;
386 for (RcontribOutput *op = outList; op; op = op->next) {
387 if (op->rData) {
388 error(INTERNAL, "output channel already open in PrepOutput()");
389 return -1;
390 }
391 op->nRows = outList->nRows;
392 op->rData = (*cdsF)(op->ofname, outOp,
393 GetHeadLen()+1024 + op->nRows*op->rowBytes);
394 freeqstr(op->ofname); op->ofname = NULL;
395 if (outOp == RCOrecover) {
396 int rd = op->CheckHeader(this);
397 if (rd < 0)
398 return -1;
399 if (rd >= op->nRows) {
400 if (remWarnings > 0) {
401 sprintf(errmsg, "recovered output '%s' is complete",
402 op->GetName());
403 error(WARNING, --remWarnings ? errmsg : "etc...");
404 }
405 rd = op->nRows;
406 }
407 if (!rInPos | (rInPos > rd)) rInPos = rd;
408 } else if (!op->NewHeader(this))
409 return -1;
410 // make sure there's room
411 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
412 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
413 return -1; // calls error() for us
414 }
415 rowsDone.NewBitMap(outList->nRows); // create row completion map
416 rowsDone.ClearBits(0, rInPos, true);
417 return nrDone = rInPos;
418 }
419
420 // Create header in open write-only channel
421 bool
422 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
423 {
424 int headlim = rcp->GetHeadLen() + 1024;
425 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
426 int esiz;
427 const int etyp = rcp->GetFormat(&esiz);
428
429 strcpy(hdr, HDRSTR);
430 strcat(hdr, "RADIANCE\n");
431 begData = strlen(hdr);
432 strcpy(hdr+begData, rcp->GetHeadStr());
433 begData += rcp->GetHeadLen();
434 if (omod) {
435 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
436 begData += strlen(hdr+begData);
437 }
438 if (obin >= 0) {
439 sprintf(hdr+begData, "BIN=%d\n", obin);
440 begData += strlen(hdr+begData);
441 }
442 strcpy(hdr+begData, ROWZEROSTR);
443 rowCountPos = begData+LNROWSTR;
444 begData += sizeof(ROWZEROSTR)-1;
445 if (!rcp->xres | (rowBytes > esiz)) {
446 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
447 begData += strlen(hdr+begData);
448 }
449 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
450 begData += strlen(hdr+begData);
451 if (NCSAMP > 3) {
452 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
453 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
454 begData += strlen(hdr+begData);
455 }
456 if (etyp != 'c') {
457 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
458 begData += strlen(hdr+begData);
459 }
460 int align = 0;
461 switch (etyp) {
462 case 'f':
463 align = sizeof(float);
464 break;
465 case 'd':
466 align = sizeof(double);
467 break;
468 case 'c':
469 break;
470 default:
471 error(INTERNAL, "unsupported data type in NewHeader()");
472 return false;
473 }
474 strcpy(hdr+begData, FMTSTR); // write format string
475 begData += strlen(hdr+begData);
476 strcpy(hdr+begData, formstr(etyp));
477 begData += strlen(hdr+begData);
478 if (align) // align data at end of header
479 while ((begData+2) % align)
480 hdr[begData++] = ' ';
481 hdr[begData++] = '\n'; // EOL for data format
482 hdr[begData++] = '\n'; // end of nominal header
483 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
484 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
485 begData += strlen(hdr+begData);
486 }
487 return rData->ReleaseMemory(hdr, RDSwrite);
488 }
489
490 // find string argument in header for named variable
491 static const char *
492 findArgs(const char *hdr, const char *vnm, int len)
493 {
494 const char * npos = strnstr(hdr, vnm, len);
495
496 if (!npos) return NULL;
497
498 npos += strlen(vnm); // find start of (first) argument
499 len -= npos - hdr;
500 while (len-- > 0 && (*npos == '=') | isspace(*npos))
501 if (*npos++ == '\n')
502 return NULL;
503
504 return (len >= 0) ? npos : NULL;
505 }
506
507 // Load and check header in read/write channel
508 int
509 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
510 {
511 int esiz;
512 const int etyp = rcp->GetFormat(&esiz);
513 const int maxlen = rcp->GetHeadLen() + 1024;
514 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
515 const char * cp;
516 // find end of header
517 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
518 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
519 error(USER, errmsg);
520 return -1;
521 }
522 begData = cp - hdr + 1; // increment again at end
523 // check # components
524 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
525 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
526 error(USER, errmsg);
527 return -1;
528 }
529 // check format
530 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
531 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
532 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
533 error(USER, errmsg);
534 return -1;
535 }
536 // check #columns
537 if ((cp = findArgs(hdr, "NCOLS=", begData)) ? (atoi(cp)*esiz != rowBytes)
538 : (!rcp->xres | (rowBytes > esiz))) {
539 sprintf(errmsg, "expected NCOLS=%d in '%s'", int(rowBytes/esiz), GetName());
540 error(USER, errmsg);
541 return -1;
542 }
543 // find row count
544 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
545 sprintf(errmsg, "missing NROWS in '%s'", GetName());
546 error(USER, errmsg);
547 return -1;
548 }
549 rowCountPos = cp - hdr;
550 int rlast = atoi(cp);
551 begData++; // advance past closing EOL
552 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
553 char rbuf[64];
554 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
555 int rlen = strlen(rbuf);
556 if (strncmp(rbuf, hdr+begData, rlen)) {
557 sprintf(errmsg, "bad resolution string in '%s'", GetName());
558 error(USER, errmsg);
559 return -1;
560 }
561 begData += rlen;
562 }
563 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
564 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
565 }
566
567 // Rewind calculation (previous results unchanged)
568 bool
569 RcontribSimulManager::ResetRow(int r)
570 {
571 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
572 error(WARNING, "ignoring bad call to ResetRow()");
573 return (r == rInPos);
574 }
575 FlushQueue(); // finish current and reset
576 for (RcontribOutput *op = outList; op; op = op->next)
577 if (!op->SetRowsDone(r))
578 return false;
579
580 rowsDone.ClearBits(r, rInPos-r, false);
581 nrDone = rInPos = r;
582 return true;
583 }
584
585 // call-back for averaging each modifier's contributions to assigned channel(s)
586 static int
587 putModContrib(const LUENT *lp, void *p)
588 {
589 RcontribMod * mp = (RcontribMod *)lp->data;
590 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
591 const double sca = 1./rcp->accum;
592 const DCOLORV * dvp = mp->cbin;
593 RcontribOutput * op = mp->opl;
594 int i, n;
595
596 switch (rcp->GetFormat()) { // conversion based on output type
597 case 'd': {
598 double * dvo = (double *)op->InsertionP(mp->coffset);
599 for (n = mp->nbins; n--; ) {
600 for (i = 0; i < NCSAMP; i++)
601 *dvo++ = *dvp++ * sca;
602 if ((op->obin >= 0) & (n > 0))
603 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
604 }
605 } break;
606 case 'f': {
607 float * fvo = (float *)op->InsertionP(mp->coffset);
608 for (n = mp->nbins; n--; ) {
609 for (i = 0; i < NCSAMP; i++)
610 *fvo++ = float(*dvp++ * sca);
611 if ((op->obin >= 0) & (n > 0))
612 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
613 }
614 } break;
615 case 'c': {
616 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
617 for (n = mp->nbins; n--; ) {
618 SCOLOR scol;
619 for (i = 0; i < NCSAMP; i++)
620 scol[i] = COLORV(*dvp++ * sca);
621 scolor_scolr(cvo, scol);
622 cvo += LSCOLR;
623 if ((op->obin >= 0) & (n > 0))
624 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
625 }
626 } break;
627 default:
628 error(CONSISTENCY, "unsupported output type in putModContrib()");
629 return -1;
630 }
631 // clear for next tally
632 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
633 return 1;
634 }
635
636 // Add a ray/bundle to compute next record
637 int
638 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
639 {
640 if (!Ready())
641 return 0;
642 if (rInPos >= outList->nRows) {
643 error(WARNING, "ComputeRecord() called after last record");
644 return 0;
645 }
646 if (nkids > 0) { // in parent process?
647 int k = GetChild(false); // updates output rows
648 if (k < 0) return -1; // someone died?
649 RowAssignment rass;
650 rass.row = kidRow[k] = rInPos++;
651 rass.ac = accum;
652 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
653 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
654 != sizeof(FVECT)*2*accum) {
655 error(SYSTEM, "cannot write to child; dead process?");
656 return -1;
657 }
658 return accum; // tracing/output happens in child
659 }
660 if (NCSAMP != nChan) {
661 error(INTERNAL, "number of color channels must remain fixed");
662 return -1;
663 }
664 // actual work is done here...
665 if (EnqueueBundle(orig_direc, accum) < 0)
666 return -1;
667
668 RcontribOutput * op; // prepare output buffers
669 for (op = outList; op; op = op->next)
670 if (!op->GetRow(rInPos))
671 return -1;
672 // convert averages & clear
673 if (lu_doall(&modLUT, putModContrib, this) < 0)
674 return -1;
675 // write buffers
676 for (op = outList; op; op = op->next)
677 op->DoneRow();
678
679 if (!nkids) // update row if solo process
680 UpdateRowsDone(rInPos++); // XXX increment here is critical
681
682 return accum;
683 }
684
685 // Get next available child, returning index or -1 if forceWait & idling
686 int
687 RcontribSimulManager::GetChild(bool forceWait)
688 {
689 if (nkids <= 0)
690 return -1;
691 // take inventory
692 int pn, n = 0;
693 fd_set readset, errset;
694 FD_ZERO(&readset); FD_ZERO(&errset);
695 for (pn = nkids; pn--; ) {
696 if (kidRow[pn] < 0) { // child already ready?
697 if (forceWait) continue;
698 return pn; // good enough
699 }
700 FD_SET(kid[pn].r, &readset); // will check on this one
701 FD_SET(kid[pn].r, &errset);
702 if (kid[pn].r >= n)
703 n = kid[pn].r + 1;
704 }
705 if (!n) // every child is idle?
706 return -1;
707 // wait on "busy" child(ren)
708 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
709 if (errno != EINTR) {
710 error(SYSTEM, "select call failed in GetChild()");
711 return -1;
712 }
713 char buf[sizeof(ROW_DONE)] = "X";
714 pn = -1; // get flags set by select
715 for (n = nkids; n--; )
716 if (kidRow[n] >= 0 &&
717 FD_ISSET(kid[n].r, &readset) |
718 FD_ISSET(kid[n].r, &errset)) {
719 // check for error
720 if (FD_ISSET(kid[n].r, &errset) ||
721 read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
722 memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
723 return -1;
724 // update output row counts
725 UpdateRowsDone(kidRow[n]);
726 kidRow[n] = -1; // flag child available
727 pn = n;
728 }
729 return pn;
730 }
731
732 // Update row completion bitmap and update outputs (does not change rInPos)
733 bool
734 RcontribSimulManager::UpdateRowsDone(int r)
735 {
736 if (!rowsDone.TestAndSet(r)) {
737 error(WARNING, "redundant call to UpdateRowsDone()");
738 return false;
739 }
740 GetRowFinished();
741 if (nrDone <= r)
742 return true; // nothing to update, yet
743 for (RcontribOutput *op = outList; op; op = op->next)
744 if (!op->SetRowsDone(nrDone))
745 return false;
746 return true; // up-to-date
747 }
748
749 // Run rcontrib child process (never returns)
750 void
751 RcontribSimulManager::RunChild()
752 {
753 FVECT * vecList = NULL;
754 RowAssignment rass;
755 ssize_t nr;
756
757 accum = 0;
758 errno = 0;
759 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
760 if (!rass.ac) {
761 error(CONSISTENCY, "bad accumulator count in child");
762 exit(1);
763 }
764 if (rass.ac > accum) {
765 efree(vecList);
766 vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
767 }
768 accum = rass.ac;
769 rInPos = rass.row;
770
771 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
772 sizeof(FVECT)*2*accum)
773 break;
774
775 if (ComputeRecord(vecList) <= 0)
776 exit(1);
777 // signal this row is done
778 if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
779 exit(1);
780 }
781 if (nr) {
782 error(SYSTEM, "read error in child process");
783 exit(1);
784 }
785 exit(0);
786 }
787
788 // Add child processes as indicated
789 bool
790 RcontribSimulManager::StartKids(int n2go)
791 {
792 if ((n2go <= 0) | (nkids + n2go <= 1))
793 return false;
794
795 if (!nkids)
796 cow_memshare(); // preload objects
797
798 n2go += nkids; // => desired brood size
799 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
800 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
801
802 fflush(stdout); // shouldn't use, anyway
803 while (nkids < n2go) {
804 kid[nkids] = sp_inactive;
805 int rv = open_process(&kid[nkids], NULL);
806 if (!rv) { // in child process?
807 while (nkids-- > 0) {
808 close(kid[nkids].r);
809 close(kid[nkids].w);
810 }
811 free(kid); free(kidRow);
812 kid = NULL; kidRow = NULL;
813 RunChild(); // should never return
814 _exit(1);
815 }
816 if (rv < 0) {
817 error(SYSTEM, "cannot fork worker process");
818 return false;
819 }
820 kidRow[nkids++] = -1; // newborn is ready
821 }
822 return true;
823 }
824
825 // Reap the indicated number of children (all if 0)
826 int
827 RcontribSimulManager::StopKids(int n2end)
828 {
829 if (nkids <= 0)
830 return 0;
831 FlushQueue();
832 int status = 0;
833 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
834 status = close_processes(kid, nkids);
835 free(kid); free(kidRow);
836 kid = NULL; kidRow = NULL;
837 nkids = 0;
838 } else { // else shrink family
839 int st;
840 while (n2end-- > 0)
841 if ((st = close_process(&kid[--nkids])) > 0)
842 status = st;
843 // could call realloc(), but it hardly matters
844 }
845 if (status) {
846 sprintf(errmsg, "non-zero (%d) status from child", status);
847 error(WARNING, errmsg);
848 }
849 return status;
850 }
851
852 // Set number of computation threads (0 => #cores)
853 int
854 RcontribSimulManager::SetThreadCount(int nt)
855 {
856 if (!Ready()) {
857 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
858 return 0;
859 }
860 if (nt < 0)
861 return NThreads();
862 if (!nt) nt = GetNCores();
863 int status = 0;
864 if (nt == 1)
865 status = StopKids();
866 else if (nt < nkids)
867 status = StopKids(nkids-nt);
868 else if (nt > nkids)
869 StartKids(nt-nkids);
870 if (status) {
871 sprintf(errmsg, "non-zero (%d) status from child", status);
872 error(WARNING, errmsg);
873 }
874 return NThreads();
875 }