ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/src/rt/RcontribSimulManager.cpp
Revision: 2.13
Committed: Thu Oct 16 23:14:31 2025 UTC (3 days, 6 hours ago) by greg
Branch: MAIN
Changes since 2.12: +6 -5 lines
Log Message:
perf: Minor fixes/optimizations

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.12 2025/10/16 18:36:23 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 // Get format identifier
56 const char *
57 formstr(int f)
58 {
59 switch (f) {
60 case 'a': return("ascii");
61 case 'f': return("float");
62 case 'd': return("double");
63 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
64 }
65 return("unknown");
66 }
67
68 // Our default data share function
69 RdataShare *
70 defDataShare(const char *name, RCOutputOp op, size_t siz)
71 {
72 return new RdataShareMap(name, RSDOflags[op], siz);
73 }
74
75 // Allocate rcontrib accumulator
76 RcontribMod *
77 NewRcMod(const char *prms, const char *binexpr, int ncbins)
78 {
79 if (!prms) prms = "";
80 if ((ncbins > 1) & !binexpr) {
81 error(USER, "missing bin expression");
82 return NULL;
83 }
84 if (ncbins < 1) ncbins = 1;
85
86 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
87 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
88 strlen(prms)+1);
89
90 if (binexpr) { // get/check bin expression
91 mp->binv = eparse(const_cast<char *>(binexpr));
92 if (mp->binv->type == NUM) { // constant expression (0)?
93 if ((int)evalue(mp->binv) != 0) {
94 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
95 binexpr);
96 error(USER, errmsg);
97 }
98 if (ncbins > 1) {
99 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
100 error(USER, errmsg);
101 }
102 epfree(mp->binv, true);
103 mp->binv = NULL;
104 prms = "";
105 ncbins = 1;
106 }
107 }
108 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
109 mp->nbins = ncbins;
110 return mp;
111 }
112
113 // Free an RcontribMod
114 void
115 FreeRcMod(void *p)
116 {
117 if (!p) return;
118 EPNODE * bep = (*(RcontribMod *)p).binv;
119 if (bep) epfree(bep, true);
120 efree(p);
121 }
122
123 // Set output format ('f', 'd', or 'c')
124 bool
125 RcontribSimulManager::SetDataFormat(int ty)
126 {
127 if (outList) {
128 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
129 return false;
130 }
131 switch (ty) {
132 case 'f':
133 dsiz = sizeof(float)*NCSAMP;
134 break;
135 case 'd':
136 dsiz = sizeof(double)*NCSAMP;
137 break;
138 case 'c':
139 dsiz = LSCOLR;
140 break;
141 default:
142 sprintf(errmsg, "unsupported output format '%c'", ty);
143 error(INTERNAL, errmsg);
144 return false;
145 }
146 dtyp = ty;
147 return true;
148 }
149
150 // static call-back for rcontrib ray-tracing
151 int
152 RcontribSimulManager::RctCall(RAY *r, void *cd)
153 {
154 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
155 return 0;
156 // shadow ray not on source?
157 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
158 return 0;
159
160 const char * mname = objptr(r->ro->omod)->oname;
161 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
162 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
163 if (!mp)
164 return 0; // not in our modifier list
165
166 int bi = 0; // get bin index
167 if (mp->binv) {
168 worldfunc(RCCONTEXT, r);
169 set_eparams(mp->params);
170 double bval = evalue(mp->binv);
171 if (bval <= -.5)
172 return 0; // silently ignore negative bin index
173 bi = int(bval + .5);
174 }
175 DCOLORV * dvp = (*mp)[bi];
176 if (!dvp) {
177 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
178 error(WARNING, errmsg);
179 return 0;
180 }
181 SCOLOR contr;
182 raycontrib(contr, r, PRIMARY); // compute coefficient
183 if (rcp->HasFlag(RCcontrib))
184 smultscolor(contr, r->rcol); // -> value contribution
185
186 for (int i = 0; i < NCSAMP; i++)
187 *dvp++ += contr[i]; // accumulate color/spectrum
188 return 1;
189 }
190
191 // check for given format type in string, return last char position
192 static int
193 hasFormat(const char *tst, const char *match)
194 {
195 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
196 const char * s = tst;
197 while (*s) {
198 if (*s++ != '%')
199 continue;
200 while (!strchr(allPoss, *s))
201 s++;
202 if (strchr(match, *s))
203 break;
204 s++;
205 }
206 return (*s != '\0')*(s - tst);
207 }
208
209 // Add a modifier and arguments, opening output file(s)
210 bool
211 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
212 const char *prms, const char *binval, int bincnt)
213 {
214 if (!modn | !outspec || !*modn | !*outspec) {
215 error(WARNING, "ignoring bad call to AddModifier()");
216 return false;
217 }
218 if (*outspec == '!') {
219 error(USER, "command output not supported by RcontribSimulManager");
220 return false;
221 }
222 if (!nChan) { // initial call?
223 if ((xres < 0) | (yres <= 0)) {
224 error(USER, "xres, yres must be set before first modifier");
225 return false;
226 }
227 if (!SetDataFormat(dtyp))
228 return false;
229 nChan = NCSAMP;
230 } else if (nChan != NCSAMP) {
231 error(USER, "# spectral channels must be fixed in AddModifier()");
232 return false;
233 }
234 if (Ready()) {
235 error(INTERNAL, "call to AddModifier() after PrepOutput()");
236 return false;
237 }
238 LUENT * lp = lu_find(&modLUT, modn);
239 if (lp->data) {
240 sprintf(errmsg, "duplicate modifier '%s'", modn);
241 error(USER, errmsg);
242 return false;
243 }
244 if (!lp->key) // create new entry
245 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
246
247 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
248 if (!mp) return false;
249 lp->data = (char *)mp;
250 const int modndx = hasFormat(outspec, "sb");
251 const int binndx = hasFormat(outspec, "diouxX");
252 int bin0 = 0;
253 char fnbuf[512];
254 if (!modndx || (binndx > 0) & (modndx > binndx))
255 sprintf(fnbuf, outspec, bin0, modn);
256 else
257 sprintf(fnbuf, outspec, modn, bin0);
258 RcontribOutput * olast = NULL;
259 RcontribOutput * op;
260 for (op = outList; op; op = (olast=op)->next) {
261 if (strcmp(op->GetName(), fnbuf))
262 continue;
263 if (modndx) { // this ain't right
264 sprintf(errmsg, "output name collision for '%s'", fnbuf);
265 error(USER, errmsg);
266 return false;
267 }
268 if ((binndx > 0) & (xres > 0)) {
269 sprintf(fnbuf, outspec, ++bin0);
270 continue; // each bin goes to another image
271 }
272 mp->coffset = op->rowBytes; // else add to what's there
273 break;
274 }
275 if (!op) { // create new output channel?
276 op = new RcontribOutput(fnbuf);
277 if (olast) olast->next = op;
278 else outList = op;
279 }
280 mp->opl = op; // first (maybe only) output channel
281 if (modndx) // remember modifier if part of name
282 op->omod = lp->key;
283 if (binndx) { // append output image/bin list
284 op->rowBytes += dsiz;
285 op->obin = bin0;
286 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
287 if (!modndx || (binndx > 0) & (modndx > binndx))
288 sprintf(fnbuf, outspec, bin0+bincnt, modn);
289 else
290 sprintf(fnbuf, outspec, modn, bin0+bincnt);
291 if (!op->next) {
292 olast = op;
293 olast->next = op = new RcontribOutput(fnbuf);
294 if (modndx) op->omod = lp->key;
295 op->obin = bin0+bincnt;
296 } else {
297 op = op->next;
298 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
299 "bin number mismatch in AddModifier()");
300 }
301 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
302 "row offset mismatch in AddModifier()");
303 op->rowBytes += dsiz;
304 }
305 } else // else send all results to this channel
306 op->rowBytes += mp->nbins*dsiz;
307 return true;
308 }
309
310 // Add a file of modifiers with associated arguments
311 bool
312 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
313 const char *prms,
314 const char *binval, int bincnt)
315 {
316 char * path = getpath(const_cast<char *>(modfn),
317 getrlibpath(), R_OK);
318 FILE * fp;
319 if (!path || !(fp = fopen(path, "r"))) {
320 sprintf(errmsg, "cannot %s modifier file '%s'",
321 path ? "open" : "find", modfn);
322 error(SYSTEM, errmsg);
323 return false;
324 }
325 char mod[MAXSTR];
326 while (fgetword(mod, sizeof(mod), fp))
327 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
328 fclose(fp);
329 return false;
330 }
331 fclose(fp);
332 return true;
333 }
334
335 // call-back to check if modifier has been loaded
336 static int
337 checkModExists(const LUENT *lp, void *p)
338 {
339 OBJECT mod = modifier(lp->key);
340
341 if ((mod != OVOID) & (mod < nsceneobjs))
342 return 1;
343
344 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
345 error(WARNING, errmsg);
346 return 0;
347 }
348
349 // Prepare output channels and return # completed rows
350 int
351 RcontribSimulManager::PrepOutput()
352 {
353 if (!outList || !RtraceSimulManager::Ready()) {
354 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
355 return -1;
356 }
357 if (!cdsF) {
358 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
359 return -1;
360 }
361 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
362 return -1;
363
364 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
365 int remWarnings = 20;
366 for (RcontribOutput *op = outList; op; op = op->next) {
367 if (op->rData) {
368 error(INTERNAL, "output channel already open in PrepOutput()");
369 return -1;
370 }
371 op->nRows = outList->nRows;
372 op->rData = (*cdsF)(op->ofname, outOp,
373 GetHeadLen()+1024 + op->nRows*op->rowBytes);
374 freeqstr(op->ofname); op->ofname = NULL;
375 if (outOp == RCOrecover) {
376 int rd = op->CheckHeader(this);
377 if (rd < 0)
378 return -1;
379 if (rd >= op->nRows) {
380 if (remWarnings >= 0) {
381 sprintf(errmsg, "recovered output '%s' is complete",
382 op->GetName());
383 error(WARNING, --remWarnings ? errmsg : "etc...");
384 }
385 rd = op->nRows;
386 }
387 if (!rInPos | (rInPos > rd)) rInPos = rd;
388 } else if (!op->NewHeader(this))
389 return -1;
390 // make sure there's room
391 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
392 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
393 return -1; // calls error() for us
394 }
395 rowsDone.NewBitMap(outList->nRows); // create row completion map
396 rowsDone.ClearBits(0, rInPos, true);
397 return rInPos;
398 }
399
400 // Create header in open write-only channel
401 bool
402 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
403 {
404 int headlim = rcp->GetHeadLen() + 1024;
405 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
406 int esiz;
407 const int etyp = rcp->GetFormat(&esiz);
408
409 strcpy(hdr, HDRSTR);
410 strcat(hdr, "RADIANCE\n");
411 begData = strlen(hdr);
412 strcpy(hdr+begData, rcp->GetHeadStr());
413 begData += rcp->GetHeadLen();
414 if (omod) {
415 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
416 begData += strlen(hdr+begData);
417 }
418 if (obin >= 0) {
419 sprintf(hdr+begData, "BIN=%d\n", obin);
420 begData += strlen(hdr+begData);
421 }
422 strcpy(hdr+begData, ROWZEROSTR);
423 rowCountPos = begData+LNROWSTR;
424 begData += sizeof(ROWZEROSTR)-1;
425 if (!rcp->xres | (rowBytes > esiz)) {
426 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
427 begData += strlen(hdr+begData);
428 }
429 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
430 begData += strlen(hdr+begData);
431 if (NCSAMP > 3) {
432 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
433 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
434 begData += strlen(hdr+begData);
435 }
436 if (etyp != 'c') {
437 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
438 begData += strlen(hdr+begData);
439 }
440 int align = 0;
441 switch (etyp) {
442 case 'f':
443 align = sizeof(float);
444 break;
445 case 'd':
446 align = sizeof(double);
447 break;
448 case 'c':
449 break;
450 default:
451 error(INTERNAL, "unsupported data type in NewHeader()");
452 return false;
453 }
454 strcpy(hdr+begData, FMTSTR); // write format string
455 begData += strlen(hdr+begData);
456 strcpy(hdr+begData, formstr(etyp));
457 begData += strlen(hdr+begData);
458 if (align) // align data at end of header
459 while ((begData+2) % align)
460 hdr[begData++] = ' ';
461 hdr[begData++] = '\n'; // EOL for data format
462 hdr[begData++] = '\n'; // end of nominal header
463 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
464 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
465 begData += strlen(hdr+begData);
466 }
467 return rData->ReleaseMemory(hdr, RDSwrite);
468 }
469
470 // find string argument in header for named variable
471 static const char *
472 findArgs(const char *hdr, const char *vnm, int len)
473 {
474 const char * npos = strnstr(hdr, vnm, len);
475
476 if (!npos) return NULL;
477
478 npos += strlen(vnm); // find start of (first) argument
479 len -= npos - hdr;
480 while (len-- > 0 && (*npos == '=') | isspace(*npos))
481 if (*npos++ == '\n')
482 return NULL;
483
484 return (len >= 0) ? npos : NULL;
485 }
486
487 // Load and check header in read/write channel
488 int
489 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
490 {
491 int esiz;
492 const int etyp = rcp->GetFormat(&esiz);
493 const int maxlen = rcp->GetHeadLen() + 1024;
494 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
495 const char * cp;
496 // find end of header
497 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
498 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
499 error(USER, errmsg);
500 return -1;
501 }
502 begData = cp - hdr + 1; // increment again at end
503 // check # components
504 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
505 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
506 error(USER, errmsg);
507 return -1;
508 }
509 // check format
510 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
511 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
512 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
513 error(USER, errmsg);
514 return -1;
515 }
516 // check #columns
517 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
518 !rcp->xres | (rowBytes > esiz)) {
519 sprintf(errmsg, "expected NCOLS=%d in '%s'",
520 int(rowBytes/esiz), GetName());
521 error(USER, errmsg);
522 return -1;
523 }
524 // find row count
525 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
526 sprintf(errmsg, "missing NROWS in '%s'", GetName());
527 error(USER, errmsg);
528 return -1;
529 }
530 rowCountPos = cp - hdr;
531 int rlast = atoi(cp);
532 begData++; // advance past closing EOL
533 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
534 char rbuf[64];
535 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
536 int rlen = strlen(rbuf);
537 if (strncmp(rbuf, hdr+begData, rlen)) {
538 sprintf(errmsg, "bad resolution string in '%s'", GetName());
539 error(USER, errmsg);
540 return -1;
541 }
542 begData += rlen;
543 }
544 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
545 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
546 }
547
548 // Rewind calculation (previous results unchanged)
549 bool
550 RcontribSimulManager::ResetRow(int r)
551 {
552 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
553 error(WARNING, "ignoring bad call to ResetRow()");
554 return (r == rInPos);
555 }
556 FlushQueue(); // finish current and reset
557 for (RcontribOutput *op = outList; op; op = op->next)
558 if (!op->SetRowsDone(r))
559 return false;
560
561 rowsDone.ClearBits(r, rInPos-r, false);
562 rInPos = r;
563 return true;
564 }
565
566 // call-back for averaging each modifier's contributions to assigned channel(s)
567 static int
568 putModContrib(const LUENT *lp, void *p)
569 {
570 RcontribMod * mp = (RcontribMod *)lp->data;
571 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
572 const double sca = 1./rcp->accum;
573 const DCOLORV * dvp = mp->cbin;
574 RcontribOutput * op = mp->opl;
575 int i, n;
576
577 switch (rcp->GetFormat()) { // conversion based on output type
578 case 'd': {
579 double * dvo = (double *)op->InsertionP(mp->coffset);
580 for (n = mp->nbins; n--; ) {
581 for (i = 0; i < NCSAMP; i++)
582 *dvo++ = *dvp++ * sca;
583 if ((op->obin >= 0) & (n > 0))
584 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
585 }
586 } break;
587 case 'f': {
588 float * fvo = (float *)op->InsertionP(mp->coffset);
589 for (n = mp->nbins; n--; ) {
590 for (i = 0; i < NCSAMP; i++)
591 *fvo++ = float(*dvp++ * sca);
592 if ((op->obin >= 0) & (n > 0))
593 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
594 }
595 } break;
596 case 'c': {
597 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
598 for (n = mp->nbins; n--; ) {
599 SCOLOR scol;
600 for (i = 0; i < NCSAMP; i++)
601 scol[i] = COLORV(*dvp++ * sca);
602 scolor_scolr(cvo, scol);
603 cvo += LSCOLR;
604 if ((op->obin >= 0) & (n > 0))
605 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
606 }
607 } break;
608 default:
609 error(CONSISTENCY, "unsupported output type in putModContrib()");
610 return -1;
611 }
612 // clear for next tally
613 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
614 return 1;
615 }
616
617 // Add a ray/bundle to compute next record
618 int
619 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
620 {
621 if (!Ready())
622 return 0;
623 if (rInPos >= outList->nRows) {
624 error(WARNING, "ComputeRecord() called after last record");
625 return 0;
626 }
627 if (nkids > 0) { // in parent process?
628 int k = GetChild(); // updates output rows
629 if (k < 0) return -1; // can't really happen
630 RowAssignment rass;
631 rass.row = kidRow[k] = rInPos++;
632 rass.ac = accum;
633 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
634 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
635 != sizeof(FVECT)*2*accum) {
636 error(SYSTEM, "cannot write to child; dead process?");
637 return -1;
638 }
639 return accum; // tracing/output happens in child
640 }
641 if (NCSAMP != nChan) {
642 error(INTERNAL, "number of color channels must remain fixed");
643 return -1;
644 }
645 // actual work is done here...
646 if (EnqueueBundle(orig_direc, accum) < 0)
647 return -1;
648
649 RcontribOutput * op; // prepare output buffers
650 for (op = outList; op; op = op->next)
651 if (!op->GetRow(rInPos))
652 return -1;
653 // convert averages & clear
654 if (lu_doall(&modLUT, putModContrib, this) < 0)
655 return -1;
656 // write buffers
657 for (op = outList; op; op = op->next)
658 op->DoneRow();
659
660 if (!nkids) // update row if solo process
661 UpdateRowsDone(rInPos++); // XXX increment here is critical
662
663 return accum;
664 }
665
666 // Get next available child, returning index or -1 if forceWait & idling
667 int
668 RcontribSimulManager::GetChild(bool forceWait)
669 {
670 if (nkids <= 0)
671 return -1;
672 // take inventory
673 int pn, n = 0;
674 fd_set writeset, errset;
675 FD_ZERO(&writeset); FD_ZERO(&errset);
676 for (pn = nkids; pn--; ) {
677 if (kidRow[pn] < 0) { // child already ready?
678 if (forceWait) continue;
679 return pn; // good enough
680 }
681 FD_SET(kid[pn].w, &writeset); // will check on this one
682 FD_SET(kid[pn].w, &errset);
683 if (kid[pn].w >= n)
684 n = kid[pn].w + 1;
685 }
686 if (!n) // every child is idle?
687 return -1;
688 // wait on "busy" child(ren)
689 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
690 if (errno != EINTR) {
691 error(SYSTEM, "select call failed in GetChild()");
692 return -1;
693 }
694 pn = -1; // get flags set by select
695 for (n = nkids; n--; )
696 if (kidRow[n] >= 0 &&
697 FD_ISSET(kid[n].w, &writeset) |
698 FD_ISSET(kid[n].w, &errset)) {
699 // update output row counts
700 if (!FD_ISSET(kid[n].w, &errset))
701 UpdateRowsDone(kidRow[n]);
702 kidRow[n] = -1; // flag it available
703 pn = n;
704 }
705 return pn;
706 }
707
708 // Update row completion bitmap and update outputs (does not change rInPos)
709 bool
710 RcontribSimulManager::UpdateRowsDone(int r)
711 {
712 if (!rowsDone.TestAndSet(r)) {
713 error(WARNING, "redundant call to UpdateRowsDone()");
714 return false;
715 }
716 int nDone = GetRowFinished();
717 if (nDone <= r)
718 return true; // nothing to update, yet
719 for (RcontribOutput *op = outList; op; op = op->next)
720 if (!op->SetRowsDone(nDone))
721 return false;
722 return true; // up-to-date
723 }
724
725 // Run rcontrib child process (never returns)
726 void
727 RcontribSimulManager::RunChild()
728 {
729 FVECT * vecList = NULL;
730 RowAssignment rass;
731 ssize_t nr;
732
733 accum = 0;
734 errno = 0;
735 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
736 if (!rass.ac) {
737 error(CONSISTENCY, "bad accumulator count in child");
738 exit(1);
739 }
740 if (rass.ac > accum) {
741 efree(vecList);
742 vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
743 }
744 accum = rass.ac;
745 rInPos = rass.row;
746
747 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
748 sizeof(FVECT)*2*accum)
749 break;
750
751 if (ComputeRecord(vecList) <= 0)
752 exit(1);
753 }
754 if (nr) {
755 error(SYSTEM, "read error in child process");
756 exit(1);
757 }
758 exit(0);
759 }
760
761 // Add child processes as indicated
762 bool
763 RcontribSimulManager::StartKids(int n2go)
764 {
765 if ((n2go <= 0) | (nkids + n2go <= 1))
766 return false;
767
768 if (!nkids)
769 cow_memshare(); // preload objects
770
771 n2go += nkids; // => desired brood size
772 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
773 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
774
775 fflush(stdout); // shouldn't use, anyway
776 while (nkids < n2go) {
777 kid[nkids] = sp_inactive;
778 kid[nkids].w = dup(1);
779 kid[nkids].flags |= PF_FILT_OUT;
780 int rv = open_process(&kid[nkids], NULL);
781 if (!rv) { // in child process?
782 while (nkids-- > 0)
783 close(kid[nkids].w);
784 free(kid); free(kidRow);
785 kid = NULL; kidRow = NULL;
786 RunChild(); // should never return
787 _exit(1);
788 }
789 if (rv < 0) {
790 error(SYSTEM, "cannot fork worker process");
791 return false;
792 }
793 kidRow[nkids++] = -1; // newborn is ready
794 }
795 return true;
796 }
797
798 // Reap the indicated number of children (all if 0)
799 int
800 RcontribSimulManager::StopKids(int n2end)
801 {
802 if (nkids <= 0)
803 return 0;
804 FlushQueue();
805 int status = 0;
806 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
807 status = close_processes(kid, nkids);
808 free(kid); free(kidRow);
809 kid = NULL; kidRow = NULL;
810 nkids = 0;
811 } else { // else shrink family
812 int st;
813 while (n2end-- > 0)
814 if ((st = close_process(&kid[--nkids])) > 0)
815 status = st;
816 // could call realloc(), but it hardly matters
817 }
818 if (status) {
819 sprintf(errmsg, "non-zero (%d) status from child", status);
820 error(WARNING, errmsg);
821 }
822 return status;
823 }
824
825 // Set number of computation threads (0 => #cores)
826 int
827 RcontribSimulManager::SetThreadCount(int nt)
828 {
829 if (!Ready()) {
830 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
831 return 0;
832 }
833 if (nt < 0)
834 return NThreads();
835 if (!nt) nt = GetNCores();
836 int status = 0;
837 if (nt == 1)
838 status = StopKids();
839 else if (nt < nkids)
840 status = StopKids(nkids-nt);
841 else if (nt > nkids)
842 StartKids(nt-nkids);
843 if (status) {
844 sprintf(errmsg, "non-zero (%d) status from child", status);
845 error(WARNING, errmsg);
846 }
847 return NThreads();
848 }