ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.11
Committed: Thu Jan 2 16:16:49 2025 UTC (3 months, 4 weeks ago) by greg
Branch: MAIN
CVS Tags: HEAD
Changes since 2.10: +18 -13 lines
Log Message:
fix(rxtrace,rxcontrib): Improvements and bug fixes in flag initialization, handling of light source tracing

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.10 2024/12/10 00:38:59 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 // Get format identifier
56 const char *
57 formstr(int f)
58 {
59 switch (f) {
60 case 'a': return("ascii");
61 case 'f': return("float");
62 case 'd': return("double");
63 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
64 }
65 return("unknown");
66 }
67
68 // Our default data share function
69 RdataShare *
70 defDataShare(const char *name, RCOutputOp op, size_t siz)
71 {
72 return new RdataShareMap(name, RSDOflags[op], siz);
73 }
74
75 // Allocate rcontrib accumulator
76 RcontribMod *
77 NewRcMod(const char *prms, const char *binexpr, int ncbins)
78 {
79 if (!prms) prms = "";
80 if ((ncbins > 1) & !binexpr) {
81 error(USER, "missing bin expression");
82 return NULL;
83 }
84 if (ncbins < 1) ncbins = 1;
85
86 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
87 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
88 strlen(prms)+1);
89
90 if (binexpr) { // get/check bin expression
91 mp->binv = eparse(const_cast<char *>(binexpr));
92 if (mp->binv->type == NUM) { // constant expression (0)?
93 if ((int)evalue(mp->binv) != 0) {
94 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
95 binexpr);
96 error(USER, errmsg);
97 }
98 if (ncbins > 1) {
99 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
100 error(USER, errmsg);
101 }
102 epfree(mp->binv, true);
103 mp->binv = NULL;
104 prms = "";
105 ncbins = 1;
106 }
107 }
108 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
109 mp->nbins = ncbins;
110 return mp;
111 }
112
113 // Free an RcontribMod
114 void
115 FreeRcMod(void *p)
116 {
117 if (!p) return;
118 EPNODE * bep = (*(RcontribMod *)p).binv;
119 if (bep) epfree(bep, true);
120 efree(p);
121 }
122
123 // Set output format ('f', 'd', or 'c')
124 bool
125 RcontribSimulManager::SetDataFormat(int ty)
126 {
127 if (outList) {
128 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
129 return false;
130 }
131 switch (ty) {
132 case 'f':
133 dsiz = sizeof(float)*NCSAMP;
134 break;
135 case 'd':
136 dsiz = sizeof(double)*NCSAMP;
137 break;
138 case 'c':
139 dsiz = LSCOLR;
140 break;
141 default:
142 sprintf(errmsg, "unsupported output format '%c'", ty);
143 error(INTERNAL, errmsg);
144 return false;
145 }
146 dtyp = ty;
147 return true;
148 }
149
150 // static call-back for rcontrib ray-tracing
151 int
152 RcontribSimulManager::RctCall(RAY *r, void *cd)
153 {
154 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
155 return 0;
156 // shadow ray not on source?
157 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
158 return 0;
159
160 const char * mname = objptr(r->ro->omod)->oname;
161 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
162 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
163 if (!mp)
164 return 0; // not in our modifier list
165
166 int bi = 0; // get bin index
167 if (mp->binv) {
168 worldfunc(RCCONTEXT, r);
169 set_eparams(mp->params);
170 double bval = evalue(mp->binv);
171 if (bval <= -.5)
172 return 0; // silently ignore negative bin index
173 bi = int(bval + .5);
174 }
175 DCOLORV * dvp = (*mp)[bi];
176 if (!dvp) {
177 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
178 error(WARNING, errmsg);
179 return 0;
180 }
181 SCOLOR contr;
182 raycontrib(contr, r, PRIMARY); // compute coefficient
183 if (rcp->HasFlag(RCcontrib))
184 smultscolor(contr, r->rcol); // -> value contribution
185
186 for (int i = 0; i < NCSAMP; i++)
187 *dvp++ += contr[i]; // accumulate color/spectrum
188 return 1;
189 }
190
191 // check for given format type in string, return last char position
192 static int
193 hasFormat(const char *tst, const char *match)
194 {
195 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
196 const char * s = tst;
197 while (*s) {
198 if (*s++ != '%')
199 continue;
200 while (!strchr(allPoss, *s))
201 s++;
202 if (strchr(match, *s))
203 break;
204 s++;
205 }
206 return (*s != '\0')*(s - tst);
207 }
208
209 // Add a modifier and arguments, opening output file(s)
210 bool
211 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
212 const char *prms, const char *binval, int bincnt)
213 {
214 if (!modn | !outspec || !*modn | !*outspec) {
215 error(WARNING, "ignoring bad call to AddModifier()");
216 return false;
217 }
218 if (!nChan) { // initial call?
219 if ((xres < 0) | (yres <= 0)) {
220 error(USER, "xres, yres must be set before first modifier");
221 return false;
222 }
223 if (!SetDataFormat(dtyp))
224 return false;
225 nChan = NCSAMP;
226 } else if (nChan != NCSAMP) {
227 error(USER, "# spectral channels must be fixed in AddModifier()");
228 return false;
229 }
230 if (Ready()) {
231 error(INTERNAL, "call to AddModifier() after PrepOutput()");
232 return false;
233 }
234 LUENT * lp = lu_find(&modLUT, modn);
235 if (lp->data) {
236 sprintf(errmsg, "duplicate modifier '%s'", modn);
237 error(USER, errmsg);
238 return false;
239 }
240 if (!lp->key) // create new entry
241 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
242
243 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
244 if (!mp) return false;
245 lp->data = (char *)mp;
246 const int modndx = hasFormat(outspec, "sb");
247 const int binndx = hasFormat(outspec, "diouxX");
248 int bin0 = 0;
249 char fnbuf[512];
250 if (!modndx || (binndx > 0) & (modndx > binndx))
251 sprintf(fnbuf, outspec, bin0, modn);
252 else
253 sprintf(fnbuf, outspec, modn, bin0);
254 RcontribOutput * olast = NULL;
255 RcontribOutput * op;
256 for (op = outList; op; op = (olast=op)->next) {
257 if (strcmp(op->GetName(), fnbuf))
258 continue;
259 if (modndx) { // this ain't right
260 sprintf(errmsg, "output name collision for '%s'", fnbuf);
261 error(USER, errmsg);
262 return false;
263 }
264 if ((binndx > 0) & (xres > 0)) {
265 sprintf(fnbuf, outspec, ++bin0);
266 continue; // each bin goes to another image
267 }
268 mp->coffset = op->rowBytes; // else add to what's there
269 break;
270 }
271 if (!op) { // create new output channel?
272 op = new RcontribOutput(fnbuf);
273 if (olast) olast->next = op;
274 else outList = op;
275 }
276 mp->opl = op; // first (maybe only) output channel
277 if (modndx) // remember modifier if part of name
278 op->omod = lp->key;
279 if (binndx) { // append output image/bin list
280 op->rowBytes += dsiz;
281 op->obin = bin0;
282 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
283 if (!modndx || (binndx > 0) & (modndx > binndx))
284 sprintf(fnbuf, outspec, bin0+bincnt, modn);
285 else
286 sprintf(fnbuf, outspec, modn, bin0+bincnt);
287 if (!op->next) {
288 olast = op;
289 olast->next = op = new RcontribOutput(fnbuf);
290 if (modndx) op->omod = lp->key;
291 op->obin = bin0+bincnt;
292 } else {
293 op = op->next;
294 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
295 "bin number mismatch in AddModifier()");
296 }
297 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
298 "row offset mismatch in AddModifier()");
299 op->rowBytes += dsiz;
300 }
301 } else // else send all results to this channel
302 op->rowBytes += mp->nbins*dsiz;
303 return true;
304 }
305
306 // Add a file of modifiers with associated arguments
307 bool
308 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
309 const char *prms,
310 const char *binval, int bincnt)
311 {
312 char * path = getpath(const_cast<char *>(modfn),
313 getrlibpath(), R_OK);
314 FILE * fp;
315 if (!path || !(fp = fopen(path, "r"))) {
316 sprintf(errmsg, "cannot %s modifier file '%s'",
317 path ? "open" : "find", modfn);
318 error(SYSTEM, errmsg);
319 return false;
320 }
321 char mod[MAXSTR];
322 while (fgetword(mod, sizeof(mod), fp))
323 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
324 fclose(fp);
325 return false;
326 }
327 fclose(fp);
328 return true;
329 }
330
331 // call-back to check if modifier has been loaded
332 static int
333 checkModExists(const LUENT *lp, void *p)
334 {
335 OBJECT mod = modifier(lp->key);
336
337 if ((mod != OVOID) & (mod < nsceneobjs))
338 return 1;
339
340 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
341 error(WARNING, errmsg);
342 return 0;
343 }
344
345 // Prepare output channels and return # completed rows
346 int
347 RcontribSimulManager::PrepOutput()
348 {
349 if (!outList || !RtraceSimulManager::Ready()) {
350 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
351 return -1;
352 }
353 if (!cdsF) {
354 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
355 return -1;
356 }
357 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
358 return -1;
359
360 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
361 int remWarnings = 20;
362 for (RcontribOutput *op = outList; op; op = op->next) {
363 if (op->rData) {
364 error(INTERNAL, "output channel already open in PrepOutput()");
365 return -1;
366 }
367 op->nRows = outList->nRows;
368 op->rData = (*cdsF)(op->ofname, outOp,
369 GetHeadLen()+1024 + op->nRows*op->rowBytes);
370 freeqstr(op->ofname); op->ofname = NULL;
371 if (outOp == RCOrecover) {
372 int rd = op->CheckHeader(this);
373 if (rd < 0)
374 return -1;
375 if (rd >= op->nRows) {
376 if (remWarnings >= 0) {
377 sprintf(errmsg, "recovered output '%s' is complete",
378 op->GetName());
379 error(WARNING, --remWarnings ? errmsg : "etc...");
380 }
381 rd = op->nRows;
382 }
383 if (!rInPos | (rInPos > rd)) rInPos = rd;
384 } else if (!op->NewHeader(this))
385 return -1;
386 // make sure there's room
387 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
388 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
389 return -1; // calls error() for us
390 }
391 rowsDone.NewBitMap(outList->nRows); // create row completion map
392 rowsDone.ClearBits(0, rInPos, true);
393 return rInPos;
394 }
395
396 // Create header in open write-only channel
397 bool
398 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
399 {
400 int headlim = rcp->GetHeadLen() + 1024;
401 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
402 int esiz;
403 const int etyp = rcp->GetFormat(&esiz);
404
405 strcpy(hdr, HDRSTR);
406 strcat(hdr, "RADIANCE\n");
407 begData = strlen(hdr);
408 strcpy(hdr+begData, rcp->GetHeadStr());
409 begData += rcp->GetHeadLen();
410 if (omod) {
411 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
412 begData += strlen(hdr+begData);
413 }
414 if (obin >= 0) {
415 sprintf(hdr+begData, "BIN=%d\n", obin);
416 begData += strlen(hdr+begData);
417 }
418 strcpy(hdr+begData, ROWZEROSTR);
419 rowCountPos = begData+LNROWSTR;
420 begData += sizeof(ROWZEROSTR)-1;
421 if (!rcp->xres | (rowBytes > esiz)) {
422 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
423 begData += strlen(hdr+begData);
424 }
425 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
426 begData += strlen(hdr+begData);
427 if (NCSAMP > 3) {
428 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
429 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
430 begData += strlen(hdr+begData);
431 }
432 if (etyp != 'c') {
433 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
434 begData += strlen(hdr+begData);
435 }
436 int align = 0;
437 switch (etyp) {
438 case 'f':
439 align = sizeof(float);
440 break;
441 case 'd':
442 align = sizeof(double);
443 break;
444 case 'c':
445 break;
446 default:
447 error(INTERNAL, "unsupported data type in NewHeader()");
448 return false;
449 }
450 strcpy(hdr+begData, FMTSTR); // write format string
451 begData += strlen(hdr+begData);
452 strcpy(hdr+begData, formstr(etyp));
453 begData += strlen(hdr+begData);
454 if (align) // align data at end of header
455 while ((begData+2) % align)
456 hdr[begData++] = ' ';
457 hdr[begData++] = '\n'; // EOL for data format
458 hdr[begData++] = '\n'; // end of nominal header
459 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
460 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
461 begData += strlen(hdr+begData);
462 }
463 return rData->ReleaseMemory(hdr, RDSwrite);
464 }
465
466 // find string argument in header for named variable
467 static const char *
468 findArgs(const char *hdr, const char *vnm, int len)
469 {
470 const char * npos = strnstr(hdr, vnm, len);
471
472 if (!npos) return NULL;
473
474 npos += strlen(vnm); // find start of (first) argument
475 len -= npos - hdr;
476 while (len-- > 0 && (*npos == '=') | isspace(*npos))
477 if (*npos++ == '\n')
478 return NULL;
479
480 return (len >= 0) ? npos : NULL;
481 }
482
483 // Load and check header in read/write channel
484 int
485 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
486 {
487 int esiz;
488 const int etyp = rcp->GetFormat(&esiz);
489 const int maxlen = rcp->GetHeadLen() + 1024;
490 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
491 const char * cp;
492 // find end of header
493 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
494 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
495 error(USER, errmsg);
496 return -1;
497 }
498 begData = cp - hdr + 1; // increment again at end
499 // check # components
500 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
501 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
502 error(USER, errmsg);
503 return -1;
504 }
505 // check format
506 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
507 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
508 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
509 error(USER, errmsg);
510 return -1;
511 }
512 // check #columns
513 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
514 !rcp->xres | (rowBytes > esiz)) {
515 sprintf(errmsg, "expected NCOLS=%d in '%s'",
516 int(rowBytes/esiz), GetName());
517 error(USER, errmsg);
518 return -1;
519 }
520 // find row count
521 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
522 sprintf(errmsg, "missing NROWS in '%s'", GetName());
523 error(USER, errmsg);
524 return -1;
525 }
526 rowCountPos = cp - hdr;
527 int rlast = atoi(cp);
528 begData++; // advance past closing EOL
529 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
530 char rbuf[64];
531 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
532 int rlen = strlen(rbuf);
533 if (strncmp(rbuf, hdr+begData, rlen)) {
534 sprintf(errmsg, "bad resolution string in '%s'", GetName());
535 error(USER, errmsg);
536 return -1;
537 }
538 begData += rlen;
539 }
540 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
541 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
542 }
543
544 // Rewind calculation (previous results unchanged)
545 bool
546 RcontribSimulManager::ResetRow(int r)
547 {
548 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
549 error(WARNING, "ignoring bad call to ResetRow()");
550 return (r == rInPos);
551 }
552 FlushQueue(); // finish current and reset
553 for (RcontribOutput *op = outList; op; op = op->next)
554 if (!op->SetRowsDone(r))
555 return false;
556
557 rowsDone.ClearBits(r, rInPos-r, false);
558 rInPos = r;
559 return true;
560 }
561
562 // call-back for averaging each modifier's contributions to assigned channel(s)
563 static int
564 putModContrib(const LUENT *lp, void *p)
565 {
566 RcontribMod * mp = (RcontribMod *)lp->data;
567 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
568 const double sca = 1./rcp->accum;
569 const DCOLORV * dvp = mp->cbin;
570 RcontribOutput * op = mp->opl;
571 int i, n;
572
573 switch (rcp->GetFormat()) { // conversion based on output type
574 case 'd': {
575 double * dvo = (double *)op->InsertionP(mp->coffset);
576 for (n = mp->nbins; n--; ) {
577 for (i = 0; i < NCSAMP; i++)
578 *dvo++ = *dvp++ * sca;
579 if ((op->obin >= 0) & (n > 0))
580 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
581 }
582 } break;
583 case 'f': {
584 float * fvo = (float *)op->InsertionP(mp->coffset);
585 for (n = mp->nbins; n--; ) {
586 for (i = 0; i < NCSAMP; i++)
587 *fvo++ = float(*dvp++ * sca);
588 if ((op->obin >= 0) & (n > 0))
589 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
590 }
591 } break;
592 case 'c': {
593 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
594 for (n = mp->nbins; n--; ) {
595 SCOLOR scol;
596 for (i = 0; i < NCSAMP; i++)
597 scol[i] = COLORV(*dvp++ * sca);
598 scolor_scolr(cvo, scol);
599 cvo += LSCOLR;
600 if ((op->obin >= 0) & (n > 0))
601 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
602 }
603 } break;
604 default:
605 error(CONSISTENCY, "unsupported output type in sendModContrib()");
606 return -1;
607 }
608 // clear for next tally
609 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
610 return 1;
611 }
612
613 // Add a ray/bundle to compute next record
614 int
615 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
616 {
617 if (!Ready())
618 return 0;
619 if (rInPos >= outList->nRows) {
620 error(WARNING, "ComputeRecord() called after last record");
621 return 0;
622 }
623 if (nkids > 0) { // in parent process?
624 int k = GetChild(); // updates output rows
625 if (k < 0) return -1; // can't really happen
626 RowAssignment rass;
627 rass.row = kidRow[k] = rInPos++;
628 rass.ac = accum;
629 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
630 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
631 != sizeof(FVECT)*2*accum) {
632 error(SYSTEM, "cannot write to child; dead process?");
633 return -1;
634 }
635 return accum; // tracing/output happens in child
636 }
637 if (NCSAMP != nChan) {
638 error(INTERNAL, "number of color channels must remain fixed");
639 return -1;
640 }
641 // actual work is done here...
642 if (EnqueueBundle(orig_direc, accum) < 0)
643 return -1;
644
645 RcontribOutput * op; // prepare output buffers
646 for (op = outList; op; op = op->next)
647 if (!op->GetRow(rInPos))
648 return -1;
649 // convert averages & clear
650 if (lu_doall(&modLUT, putModContrib, this) < 0)
651 return -1;
652 // write buffers
653 for (op = outList; op; op = op->next)
654 op->DoneRow();
655
656 if (!nkids) // update row if solo process
657 UpdateRowsDone(rInPos++); // XXX increment here is critical
658
659 return accum;
660 }
661
662 // Get next available child, returning index or -1 if forceWait & idling
663 int
664 RcontribSimulManager::GetChild(bool forceWait)
665 {
666 if (nkids <= 0)
667 return -1;
668 // take inventory
669 int pn, n = 0;
670 fd_set writeset, errset;
671 FD_ZERO(&writeset); FD_ZERO(&errset);
672 for (pn = nkids; pn--; ) {
673 if (kidRow[pn] < 0) { // child already ready?
674 if (forceWait) continue;
675 return pn; // good enough
676 }
677 FD_SET(kid[pn].w, &writeset); // will check on this one
678 FD_SET(kid[pn].w, &errset);
679 if (kid[pn].w >= n)
680 n = kid[pn].w + 1;
681 }
682 if (!n) // every child is idle?
683 return -1;
684 // wait on "busy" child(ren)
685 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
686 if (errno != EINTR) {
687 error(SYSTEM, "select call failed in GetChild()");
688 return -1;
689 }
690 pn = -1; // get flags set by select
691 for (n = nkids; n--; )
692 if (kidRow[n] >= 0 &&
693 FD_ISSET(kid[n].w, &writeset) |
694 FD_ISSET(kid[n].w, &errset)) {
695 // update output row counts
696 if (!FD_ISSET(kid[n].w, &errset))
697 UpdateRowsDone(kidRow[n]);
698 kidRow[n] = -1; // flag it available
699 pn = n;
700 }
701 return pn;
702 }
703
704 // Update row completion bitmap and update outputs (does not change rInPos)
705 bool
706 RcontribSimulManager::UpdateRowsDone(int r)
707 {
708 if (!rowsDone.TestAndSet(r)) {
709 error(WARNING, "redundant call to UpdateRowsDone()");
710 return false;
711 }
712 int nDone = GetRowFinished();
713 if (nDone <= r)
714 return true; // nothing to update, yet
715 for (RcontribOutput *op = outList; op; op = op->next)
716 if (!op->SetRowsDone(nDone))
717 return false;
718 return true; // up-to-date
719 }
720
721 // Run rcontrib child process (never returns)
722 void
723 RcontribSimulManager::RunChild()
724 {
725 FVECT * vecList = NULL;
726 RowAssignment rass;
727 ssize_t nr;
728
729 accum = 0;
730 errno = 0;
731 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
732 if (!rass.ac) {
733 error(CONSISTENCY, "bad accumulator count in child");
734 exit(1);
735 }
736 if (rass.ac > accum)
737 vecList = (FVECT *)erealloc(vecList,
738 sizeof(FVECT)*2*rass.ac);
739 accum = rass.ac;
740 rInPos = rass.row;
741
742 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
743 sizeof(FVECT)*2*accum)
744 break;
745
746 if (ComputeRecord(vecList) <= 0)
747 exit(1);
748 }
749 if (nr) {
750 error(SYSTEM, "read error in child process");
751 exit(1);
752 }
753 exit(0);
754 }
755
756 // Add child processes as indicated
757 bool
758 RcontribSimulManager::StartKids(int n2go)
759 {
760 if ((n2go <= 0) | (nkids + n2go <= 1))
761 return false;
762
763 if (!nkids)
764 cow_memshare(); // preload objects
765
766 n2go += nkids; // => desired brood size
767 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
768 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
769
770 fflush(stdout); // shouldn't use, anyway
771 while (nkids < n2go) {
772 kid[nkids] = sp_inactive;
773 kid[nkids].w = dup(1);
774 kid[nkids].flags |= PF_FILT_OUT;
775 int rv = open_process(&kid[nkids], NULL);
776 if (!rv) { // in child process?
777 while (nkids-- > 0)
778 close(kid[nkids].w);
779 free(kid); free(kidRow);
780 kid = NULL; kidRow = NULL;
781 RunChild(); // should never return
782 _exit(1);
783 }
784 if (rv < 0) {
785 error(SYSTEM, "cannot fork worker process");
786 return false;
787 }
788 kidRow[nkids++] = -1; // newborn is ready
789 }
790 return true;
791 }
792
793 // Reap the indicated number of children (all if 0)
794 int
795 RcontribSimulManager::StopKids(int n2end)
796 {
797 if (nkids <= 0)
798 return 0;
799 FlushQueue();
800 int status = 0;
801 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
802 status = close_processes(kid, nkids);
803 free(kid); free(kidRow);
804 kid = NULL; kidRow = NULL;
805 nkids = 0;
806 } else { // else shrink family
807 int st;
808 while (n2end-- > 0)
809 if ((st = close_process(&kid[--nkids])) > 0)
810 status = st;
811 // could call realloc(), but it hardly matters
812 }
813 if (status) {
814 sprintf(errmsg, "non-zero (%d) status from child", status);
815 error(WARNING, errmsg);
816 }
817 return status;
818 }
819
820 // Set number of computation threads (0 => #cores)
821 int
822 RcontribSimulManager::SetThreadCount(int nt)
823 {
824 if (!Ready()) {
825 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
826 return 0;
827 }
828 if (nt < 0)
829 return NThreads();
830 if (!nt) nt = GetNCores();
831 int status = 0;
832 if (nt == 1)
833 status = StopKids();
834 else if (nt < nkids)
835 status = StopKids(nkids-nt);
836 else if (nt > nkids)
837 StartKids(nt-nkids);
838 if (status) {
839 sprintf(errmsg, "non-zero (%d) status from child", status);
840 error(WARNING, errmsg);
841 }
842 return NThreads();
843 }