ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/src/rt/RcontribSimulManager.cpp
Revision: 2.16
Committed: Thu Oct 23 01:26:48 2025 UTC (24 hours, 37 minutes ago) by greg
Branch: MAIN
Changes since 2.15: +2 -1 lines
Log Message:
fix(rxfluxmtx): Added -t option and numerous bug fixes

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.15 2025/10/17 17:43:53 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 static const char ROW_DONE[] = "ROW FINISHED\n";
56
57 // allocate rcontrib accumulator
58 static RcontribMod *
59 NewRcMod(const char *prms, const char *binexpr, int ncbins)
60 {
61 if (binexpr && !*binexpr) binexpr = NULL;
62 if (!prms) prms = "";
63 if ((ncbins > 1) & !binexpr) {
64 error(USER, "missing bin expression");
65 return NULL;
66 }
67 if (ncbins < 1) ncbins = 1;
68
69 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
70 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
71 strlen(prms)+1);
72
73 if (binexpr) { // get/check bin expression
74 mp->binv = eparse(const_cast<char *>(binexpr));
75 if (mp->binv->type == NUM) { // constant expression (0)?
76 if ((int)evalue(mp->binv) != 0) {
77 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
78 binexpr);
79 error(USER, errmsg);
80 }
81 if (ncbins > 1) {
82 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
83 error(USER, errmsg);
84 }
85 epfree(mp->binv, true);
86 mp->binv = NULL;
87 prms = "";
88 ncbins = 1;
89 }
90 }
91 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
92 mp->nbins = ncbins;
93 return mp;
94 }
95
96 // Free an RcontribMod (public for RcontribSimulManager constructor)
97 void
98 FreeRcMod(void *p)
99 {
100 if (!p) return;
101 EPNODE * bep = (*(RcontribMod *)p).binv;
102 if (bep) epfree(bep, true);
103 efree(p);
104 }
105
106 // Get format identifier
107 const char *
108 formstr(int f)
109 {
110 switch (f) {
111 case 'a': return("ascii");
112 case 'f': return("float");
113 case 'd': return("double");
114 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
115 }
116 return("unknown");
117 }
118
119 // Our default data share function
120 RdataShare *
121 defDataShare(const char *name, RCOutputOp op, size_t siz)
122 {
123 return new RdataShareMap(name, RSDOflags[op], siz);
124 }
125
126 // Set output format ('f', 'd', or 'c')
127 bool
128 RcontribSimulManager::SetDataFormat(int ty)
129 {
130 if (outList) {
131 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
132 return false;
133 }
134 switch (ty) {
135 case 'f':
136 dsiz = sizeof(float)*NCSAMP;
137 break;
138 case 'd':
139 dsiz = sizeof(double)*NCSAMP;
140 break;
141 case 'c':
142 dsiz = LSCOLR;
143 break;
144 default:
145 sprintf(errmsg, "unsupported output format '%c'", ty);
146 error(INTERNAL, errmsg);
147 return false;
148 }
149 dtyp = ty;
150 return true;
151 }
152
153 // static call-back for rcontrib ray-tracing
154 int
155 RcontribSimulManager::RctCall(RAY *r, void *cd)
156 {
157 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
158 return 0;
159 // shadow ray not on source?
160 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
161 return 0;
162
163 const char * mname = objptr(r->ro->omod)->oname;
164 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
165 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
166 if (!mp)
167 return 0; // not in our modifier list
168
169 int bi = 0; // get bin index
170 if (mp->binv) {
171 worldfunc(RCCONTEXT, r);
172 set_eparams(mp->params);
173 double bval = evalue(mp->binv);
174 if (bval <= -.5)
175 return 0; // silently ignore negative bin index
176 bi = int(bval + .5);
177 }
178 DCOLORV * dvp = (*mp)[bi];
179 if (!dvp) {
180 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
181 error(WARNING, errmsg);
182 return 0;
183 }
184 SCOLOR contr;
185 raycontrib(contr, r, PRIMARY); // compute coefficient
186 if (rcp->HasFlag(RCcontrib))
187 smultscolor(contr, r->rcol); // -> value contribution
188
189 for (int i = 0; i < NCSAMP; i++)
190 *dvp++ += contr[i]; // accumulate color/spectrum
191 return 1;
192 }
193
194 // check for given format type in string, return last char position
195 static int
196 hasFormat(const char *tst, const char *match)
197 {
198 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
199 const char * s = tst;
200 while (*s) {
201 if (*s++ != '%')
202 continue;
203 while (!strchr(allPoss, *s))
204 s++;
205 if (strchr(match, *s))
206 break;
207 s++;
208 }
209 return (*s != '\0')*(s - tst);
210 }
211
212 // Add a modifier and arguments, opening output file(s)
213 bool
214 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
215 const char *prms, const char *binval, int bincnt)
216 {
217 if (!modn | !outspec || !*modn | !*outspec) {
218 error(WARNING, "ignoring bad call to AddModifier()");
219 return false;
220 }
221 if (*outspec == '!') {
222 error(USER, "command output not supported by RcontribSimulManager");
223 return false;
224 }
225 if (!nChan) { // initial call?
226 if ((xres < 0) | (yres <= 0)) {
227 error(USER, "xres, yres must be set before first modifier");
228 return false;
229 }
230 if (!SetDataFormat(dtyp))
231 return false;
232 nChan = NCSAMP;
233 } else if (nChan != NCSAMP) {
234 error(USER, "# spectral channels must be fixed in AddModifier()");
235 return false;
236 }
237 if (Ready()) {
238 error(INTERNAL, "call to AddModifier() after PrepOutput()");
239 return false;
240 }
241 LUENT * lp = lu_find(&modLUT, modn);
242 if (lp->data) {
243 sprintf(errmsg, "duplicate modifier '%s'", modn);
244 error(USER, errmsg);
245 return false;
246 }
247 if (!lp->key) // create new entry
248 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
249
250 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
251 if (!mp) return false;
252 lp->data = (char *)mp;
253 const int modndx = hasFormat(outspec, "sb");
254 const int binndx = hasFormat(outspec, "diouxX");
255 int bin0 = 0;
256 char fnbuf[512];
257 if (!modndx || (binndx > 0) & (modndx > binndx))
258 sprintf(fnbuf, outspec, bin0, modn);
259 else
260 sprintf(fnbuf, outspec, modn, bin0);
261 RcontribOutput * olast = NULL;
262 RcontribOutput * op;
263 for (op = outList; op; op = (olast=op)->next) {
264 if (strcmp(op->GetName(), fnbuf))
265 continue;
266 if (modndx) { // this ain't right
267 sprintf(errmsg, "output name collision for '%s'", fnbuf);
268 error(USER, errmsg);
269 return false;
270 }
271 if ((binndx > 0) & (xres > 0)) {
272 sprintf(fnbuf, outspec, ++bin0);
273 continue; // each bin goes to another image
274 }
275 mp->coffset = op->rowBytes; // else add to what's there
276 break;
277 }
278 if (!op) { // create new output channel?
279 op = new RcontribOutput(fnbuf);
280 if (olast) olast->next = op;
281 else outList = op;
282 }
283 mp->opl = op; // first (maybe only) output channel
284 if (modndx) // remember modifier if part of name
285 op->omod = lp->key;
286 if (binndx) { // append output image/bin list
287 op->rowBytes += dsiz;
288 op->obin = bin0;
289 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
290 if (!modndx || (binndx > 0) & (modndx > binndx))
291 sprintf(fnbuf, outspec, bin0+bincnt, modn);
292 else
293 sprintf(fnbuf, outspec, modn, bin0+bincnt);
294 if (!op->next) {
295 olast = op;
296 olast->next = op = new RcontribOutput(fnbuf);
297 if (modndx) op->omod = lp->key;
298 op->obin = bin0+bincnt;
299 } else {
300 op = op->next;
301 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
302 "bin number mismatch in AddModifier()");
303 }
304 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
305 "row offset mismatch in AddModifier()");
306 op->rowBytes += dsiz;
307 }
308 } else // else send all results to this channel
309 op->rowBytes += mp->nbins*dsiz;
310 return true;
311 }
312
313 // Add a file of modifiers with associated arguments
314 bool
315 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
316 const char *prms,
317 const char *binval, int bincnt)
318 {
319 char * path = getpath(const_cast<char *>(modfn),
320 getrlibpath(), R_OK);
321 FILE * fp;
322 if (!path || !(fp = fopen(path, "r"))) {
323 sprintf(errmsg, "cannot %s modifier file '%s'",
324 path ? "open" : "find", modfn);
325 error(SYSTEM, errmsg);
326 return false;
327 }
328 char mod[MAXSTR];
329 while (fgetword(mod, sizeof(mod), fp))
330 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
331 fclose(fp);
332 return false;
333 }
334 fclose(fp);
335 return true;
336 }
337
338 // call-back to check if modifier has been loaded
339 static int
340 checkModExists(const LUENT *lp, void *p)
341 {
342 OBJECT mod = modifier(lp->key);
343
344 if ((mod != OVOID) & (mod < nsceneobjs))
345 return 1;
346
347 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
348 error(WARNING, errmsg);
349 return 0;
350 }
351
352 // Prepare output channels and return # completed rows
353 int
354 RcontribSimulManager::PrepOutput()
355 {
356 if (!outList || !RtraceSimulManager::Ready()) {
357 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
358 return -1;
359 }
360 if (!cdsF) {
361 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
362 return -1;
363 }
364 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
365 return -1;
366
367 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
368 int remWarnings = 20;
369 for (RcontribOutput *op = outList; op; op = op->next) {
370 if (op->rData) {
371 error(INTERNAL, "output channel already open in PrepOutput()");
372 return -1;
373 }
374 op->nRows = outList->nRows;
375 op->rData = (*cdsF)(op->ofname, outOp,
376 GetHeadLen()+1024 + op->nRows*op->rowBytes);
377 freeqstr(op->ofname); op->ofname = NULL;
378 if (outOp == RCOrecover) {
379 int rd = op->CheckHeader(this);
380 if (rd < 0)
381 return -1;
382 if (rd >= op->nRows) {
383 if (remWarnings >= 0) {
384 sprintf(errmsg, "recovered output '%s' is complete",
385 op->GetName());
386 error(WARNING, --remWarnings ? errmsg : "etc...");
387 }
388 rd = op->nRows;
389 }
390 if (!rInPos | (rInPos > rd)) rInPos = rd;
391 } else if (!op->NewHeader(this))
392 return -1;
393 // make sure there's room
394 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
395 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
396 return -1; // calls error() for us
397 }
398 rowsDone.NewBitMap(outList->nRows); // create row completion map
399 rowsDone.ClearBits(0, rInPos, true);
400 return rInPos;
401 }
402
403 // Create header in open write-only channel
404 bool
405 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
406 {
407 int headlim = rcp->GetHeadLen() + 1024;
408 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
409 int esiz;
410 const int etyp = rcp->GetFormat(&esiz);
411
412 strcpy(hdr, HDRSTR);
413 strcat(hdr, "RADIANCE\n");
414 begData = strlen(hdr);
415 strcpy(hdr+begData, rcp->GetHeadStr());
416 begData += rcp->GetHeadLen();
417 if (omod) {
418 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
419 begData += strlen(hdr+begData);
420 }
421 if (obin >= 0) {
422 sprintf(hdr+begData, "BIN=%d\n", obin);
423 begData += strlen(hdr+begData);
424 }
425 strcpy(hdr+begData, ROWZEROSTR);
426 rowCountPos = begData+LNROWSTR;
427 begData += sizeof(ROWZEROSTR)-1;
428 if (!rcp->xres | (rowBytes > esiz)) {
429 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
430 begData += strlen(hdr+begData);
431 }
432 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
433 begData += strlen(hdr+begData);
434 if (NCSAMP > 3) {
435 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
436 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
437 begData += strlen(hdr+begData);
438 }
439 if (etyp != 'c') {
440 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
441 begData += strlen(hdr+begData);
442 }
443 int align = 0;
444 switch (etyp) {
445 case 'f':
446 align = sizeof(float);
447 break;
448 case 'd':
449 align = sizeof(double);
450 break;
451 case 'c':
452 break;
453 default:
454 error(INTERNAL, "unsupported data type in NewHeader()");
455 return false;
456 }
457 strcpy(hdr+begData, FMTSTR); // write format string
458 begData += strlen(hdr+begData);
459 strcpy(hdr+begData, formstr(etyp));
460 begData += strlen(hdr+begData);
461 if (align) // align data at end of header
462 while ((begData+2) % align)
463 hdr[begData++] = ' ';
464 hdr[begData++] = '\n'; // EOL for data format
465 hdr[begData++] = '\n'; // end of nominal header
466 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
467 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
468 begData += strlen(hdr+begData);
469 }
470 return rData->ReleaseMemory(hdr, RDSwrite);
471 }
472
473 // find string argument in header for named variable
474 static const char *
475 findArgs(const char *hdr, const char *vnm, int len)
476 {
477 const char * npos = strnstr(hdr, vnm, len);
478
479 if (!npos) return NULL;
480
481 npos += strlen(vnm); // find start of (first) argument
482 len -= npos - hdr;
483 while (len-- > 0 && (*npos == '=') | isspace(*npos))
484 if (*npos++ == '\n')
485 return NULL;
486
487 return (len >= 0) ? npos : NULL;
488 }
489
490 // Load and check header in read/write channel
491 int
492 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
493 {
494 int esiz;
495 const int etyp = rcp->GetFormat(&esiz);
496 const int maxlen = rcp->GetHeadLen() + 1024;
497 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
498 const char * cp;
499 // find end of header
500 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
501 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
502 error(USER, errmsg);
503 return -1;
504 }
505 begData = cp - hdr + 1; // increment again at end
506 // check # components
507 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
508 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
509 error(USER, errmsg);
510 return -1;
511 }
512 // check format
513 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
514 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
515 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
516 error(USER, errmsg);
517 return -1;
518 }
519 // check #columns
520 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
521 !rcp->xres | (rowBytes > esiz)) {
522 sprintf(errmsg, "expected NCOLS=%d in '%s'",
523 int(rowBytes/esiz), GetName());
524 error(USER, errmsg);
525 return -1;
526 }
527 // find row count
528 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
529 sprintf(errmsg, "missing NROWS in '%s'", GetName());
530 error(USER, errmsg);
531 return -1;
532 }
533 rowCountPos = cp - hdr;
534 int rlast = atoi(cp);
535 begData++; // advance past closing EOL
536 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
537 char rbuf[64];
538 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
539 int rlen = strlen(rbuf);
540 if (strncmp(rbuf, hdr+begData, rlen)) {
541 sprintf(errmsg, "bad resolution string in '%s'", GetName());
542 error(USER, errmsg);
543 return -1;
544 }
545 begData += rlen;
546 }
547 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
548 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
549 }
550
551 // Rewind calculation (previous results unchanged)
552 bool
553 RcontribSimulManager::ResetRow(int r)
554 {
555 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
556 error(WARNING, "ignoring bad call to ResetRow()");
557 return (r == rInPos);
558 }
559 FlushQueue(); // finish current and reset
560 for (RcontribOutput *op = outList; op; op = op->next)
561 if (!op->SetRowsDone(r))
562 return false;
563
564 rowsDone.ClearBits(r, rInPos-r, false);
565 rInPos = r;
566 return true;
567 }
568
569 // call-back for averaging each modifier's contributions to assigned channel(s)
570 static int
571 putModContrib(const LUENT *lp, void *p)
572 {
573 RcontribMod * mp = (RcontribMod *)lp->data;
574 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
575 const double sca = 1./rcp->accum;
576 const DCOLORV * dvp = mp->cbin;
577 RcontribOutput * op = mp->opl;
578 int i, n;
579
580 switch (rcp->GetFormat()) { // conversion based on output type
581 case 'd': {
582 double * dvo = (double *)op->InsertionP(mp->coffset);
583 for (n = mp->nbins; n--; ) {
584 for (i = 0; i < NCSAMP; i++)
585 *dvo++ = *dvp++ * sca;
586 if ((op->obin >= 0) & (n > 0))
587 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
588 }
589 } break;
590 case 'f': {
591 float * fvo = (float *)op->InsertionP(mp->coffset);
592 for (n = mp->nbins; n--; ) {
593 for (i = 0; i < NCSAMP; i++)
594 *fvo++ = float(*dvp++ * sca);
595 if ((op->obin >= 0) & (n > 0))
596 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
597 }
598 } break;
599 case 'c': {
600 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
601 for (n = mp->nbins; n--; ) {
602 SCOLOR scol;
603 for (i = 0; i < NCSAMP; i++)
604 scol[i] = COLORV(*dvp++ * sca);
605 scolor_scolr(cvo, scol);
606 cvo += LSCOLR;
607 if ((op->obin >= 0) & (n > 0))
608 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
609 }
610 } break;
611 default:
612 error(CONSISTENCY, "unsupported output type in putModContrib()");
613 return -1;
614 }
615 // clear for next tally
616 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
617 return 1;
618 }
619
620 // Add a ray/bundle to compute next record
621 int
622 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
623 {
624 if (!Ready())
625 return 0;
626 if (rInPos >= outList->nRows) {
627 error(WARNING, "ComputeRecord() called after last record");
628 return 0;
629 }
630 if (nkids > 0) { // in parent process?
631 int k = GetChild(false); // updates output rows
632 if (k < 0) return -1; // someone died?
633 RowAssignment rass;
634 rass.row = kidRow[k] = rInPos++;
635 rass.ac = accum;
636 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
637 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
638 != sizeof(FVECT)*2*accum) {
639 error(SYSTEM, "cannot write to child; dead process?");
640 return -1;
641 }
642 return accum; // tracing/output happens in child
643 }
644 if (NCSAMP != nChan) {
645 error(INTERNAL, "number of color channels must remain fixed");
646 return -1;
647 }
648 // actual work is done here...
649 if (EnqueueBundle(orig_direc, accum) < 0)
650 return -1;
651
652 RcontribOutput * op; // prepare output buffers
653 for (op = outList; op; op = op->next)
654 if (!op->GetRow(rInPos))
655 return -1;
656 // convert averages & clear
657 if (lu_doall(&modLUT, putModContrib, this) < 0)
658 return -1;
659 // write buffers
660 for (op = outList; op; op = op->next)
661 op->DoneRow();
662
663 if (!nkids) // update row if solo process
664 UpdateRowsDone(rInPos++); // XXX increment here is critical
665
666 return accum;
667 }
668
669 // Get next available child, returning index or -1 if forceWait & idling
670 int
671 RcontribSimulManager::GetChild(bool forceWait)
672 {
673 if (nkids <= 0)
674 return -1;
675 // take inventory
676 int pn, n = 0;
677 fd_set readset, errset;
678 FD_ZERO(&readset); FD_ZERO(&errset);
679 for (pn = nkids; pn--; ) {
680 if (kidRow[pn] < 0) { // child already ready?
681 if (forceWait) continue;
682 return pn; // good enough
683 }
684 FD_SET(kid[pn].r, &readset); // will check on this one
685 FD_SET(kid[pn].r, &errset);
686 if (kid[pn].r >= n)
687 n = kid[pn].r + 1;
688 }
689 if (!n) // every child is idle?
690 return -1;
691 // wait on "busy" child(ren)
692 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
693 if (errno != EINTR) {
694 error(SYSTEM, "select call failed in GetChild()");
695 return -1;
696 }
697 char buf[sizeof(ROW_DONE)] = "X";
698 pn = -1; // get flags set by select
699 for (n = nkids; n--; )
700 if (kidRow[n] >= 0 &&
701 FD_ISSET(kid[n].r, &readset) |
702 FD_ISSET(kid[n].r, &errset)) {
703 // check for error
704 if (FD_ISSET(kid[n].r, &errset) ||
705 read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
706 memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
707 return -1;
708 // update output row counts
709 UpdateRowsDone(kidRow[n]);
710 kidRow[n] = -1; // flag child available
711 pn = n;
712 }
713 return pn;
714 }
715
716 // Update row completion bitmap and update outputs (does not change rInPos)
717 bool
718 RcontribSimulManager::UpdateRowsDone(int r)
719 {
720 if (!rowsDone.TestAndSet(r)) {
721 error(WARNING, "redundant call to UpdateRowsDone()");
722 return false;
723 }
724 int nDone = GetRowFinished();
725 if (nDone <= r)
726 return true; // nothing to update, yet
727 for (RcontribOutput *op = outList; op; op = op->next)
728 if (!op->SetRowsDone(nDone))
729 return false;
730 return true; // up-to-date
731 }
732
733 // Run rcontrib child process (never returns)
734 void
735 RcontribSimulManager::RunChild()
736 {
737 FVECT * vecList = NULL;
738 RowAssignment rass;
739 ssize_t nr;
740
741 accum = 0;
742 errno = 0;
743 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
744 if (!rass.ac) {
745 error(CONSISTENCY, "bad accumulator count in child");
746 exit(1);
747 }
748 if (rass.ac > accum) {
749 efree(vecList);
750 vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
751 }
752 accum = rass.ac;
753 rInPos = rass.row;
754
755 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
756 sizeof(FVECT)*2*accum)
757 break;
758
759 if (ComputeRecord(vecList) <= 0)
760 exit(1);
761 // signal this row is done
762 if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
763 exit(1);
764 }
765 if (nr) {
766 error(SYSTEM, "read error in child process");
767 exit(1);
768 }
769 exit(0);
770 }
771
772 // Add child processes as indicated
773 bool
774 RcontribSimulManager::StartKids(int n2go)
775 {
776 if ((n2go <= 0) | (nkids + n2go <= 1))
777 return false;
778
779 if (!nkids)
780 cow_memshare(); // preload objects
781
782 n2go += nkids; // => desired brood size
783 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
784 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
785
786 fflush(stdout); // shouldn't use, anyway
787 while (nkids < n2go) {
788 kid[nkids] = sp_inactive;
789 int rv = open_process(&kid[nkids], NULL);
790 if (!rv) { // in child process?
791 while (nkids-- > 0) {
792 close(kid[nkids].r);
793 close(kid[nkids].w);
794 }
795 free(kid); free(kidRow);
796 kid = NULL; kidRow = NULL;
797 RunChild(); // should never return
798 _exit(1);
799 }
800 if (rv < 0) {
801 error(SYSTEM, "cannot fork worker process");
802 return false;
803 }
804 kidRow[nkids++] = -1; // newborn is ready
805 }
806 return true;
807 }
808
809 // Reap the indicated number of children (all if 0)
810 int
811 RcontribSimulManager::StopKids(int n2end)
812 {
813 if (nkids <= 0)
814 return 0;
815 FlushQueue();
816 int status = 0;
817 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
818 status = close_processes(kid, nkids);
819 free(kid); free(kidRow);
820 kid = NULL; kidRow = NULL;
821 nkids = 0;
822 } else { // else shrink family
823 int st;
824 while (n2end-- > 0)
825 if ((st = close_process(&kid[--nkids])) > 0)
826 status = st;
827 // could call realloc(), but it hardly matters
828 }
829 if (status) {
830 sprintf(errmsg, "non-zero (%d) status from child", status);
831 error(WARNING, errmsg);
832 }
833 return status;
834 }
835
836 // Set number of computation threads (0 => #cores)
837 int
838 RcontribSimulManager::SetThreadCount(int nt)
839 {
840 if (!Ready()) {
841 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
842 return 0;
843 }
844 if (nt < 0)
845 return NThreads();
846 if (!nt) nt = GetNCores();
847 int status = 0;
848 if (nt == 1)
849 status = StopKids();
850 else if (nt < nkids)
851 status = StopKids(nkids-nt);
852 else if (nt > nkids)
853 StartKids(nt-nkids);
854 if (status) {
855 sprintf(errmsg, "non-zero (%d) status from child", status);
856 error(WARNING, errmsg);
857 }
858 return NThreads();
859 }