ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.10
Committed: Tue Dec 10 00:38:59 2024 UTC (4 months, 3 weeks ago) by greg
Branch: MAIN
Changes since 2.9: +21 -12 lines
Log Message:
fix(rxcontrib): Allow for non-constant expressions for lone bin

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.9 2024/12/03 17:39:42 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 // Get format identifier
56 const char *
57 formstr(int f)
58 {
59 switch (f) {
60 case 'a': return("ascii");
61 case 'f': return("float");
62 case 'd': return("double");
63 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
64 }
65 return("unknown");
66 }
67
68 // Our default data share function
69 RdataShare *
70 defDataShare(const char *name, RCOutputOp op, size_t siz)
71 {
72 return new RdataShareMap(name, RSDOflags[op], siz);
73 }
74
75 // Allocate rcontrib accumulator
76 RcontribMod *
77 NewRcMod(const char *prms, const char *binexpr, int ncbins)
78 {
79 if (!prms) prms = "";
80 if ((ncbins > 1) & !binexpr) {
81 error(USER, "missing bin expression");
82 return NULL;
83 }
84 if (ncbins < 1) ncbins = 1;
85
86 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
87 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
88 strlen(prms)+1);
89
90 if (binexpr) { // check bin expression
91 mp->binv = eparse(const_cast<char *>(binexpr));
92 if (mp->binv->type == NUM) { // constant expression (0)?
93 if ((int)(evalue(mp->binv) + .5) > 0) {
94 sprintf(errmsg, "illegal positive constant for bin (%s)",
95 binexpr);
96 error(USER, errmsg);
97 }
98 if (ncbins > 1) {
99 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
100 error(USER, errmsg);
101 }
102 epfree(mp->binv, true);
103 mp->binv = NULL;
104 prms = "";
105 ncbins = 1;
106 }
107 }
108 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
109 mp->nbins = ncbins;
110 return mp;
111 }
112
113 // Free an RcontribMod
114 void
115 FreeRcMod(void *p)
116 {
117 if (!p) return;
118 EPNODE * bep = (*(RcontribMod *)p).binv;
119 if (bep) epfree(bep, true);
120 efree(p);
121 }
122
123 // Set output format ('f', 'd', or 'c')
124 bool
125 RcontribSimulManager::SetDataFormat(int ty)
126 {
127 if (outList) {
128 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
129 return false;
130 }
131 switch (ty) {
132 case 'f':
133 dsiz = sizeof(float)*NCSAMP;
134 break;
135 case 'd':
136 dsiz = sizeof(double)*NCSAMP;
137 break;
138 case 'c':
139 dsiz = LSCOLR;
140 break;
141 default:
142 sprintf(errmsg, "unsupported output format '%c'", ty);
143 error(INTERNAL, errmsg);
144 return false;
145 }
146 dtyp = ty;
147 return true;
148 }
149
150 // static call-back for rcontrib ray-tracing
151 int
152 RcontribSimulManager::RctCall(RAY *r, void *cd)
153 {
154 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
155 return 0;
156 // shadow ray not on source?
157 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
158 return 0;
159
160 const char * mname = objptr(r->ro->omod)->oname;
161 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
162 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
163 if (!mp)
164 return 0; // not in our modifier list
165
166 int bi = 0; // get bin index
167 if (mp->binv) {
168 worldfunc(RCCONTEXT, r);
169 set_eparams(mp->params);
170 double bval = evalue(mp->binv);
171 if (bval <= -.5)
172 return 0; // silently ignore negative bin index
173 bi = int(bval + .5);
174 }
175 DCOLORV * dvp = (*mp)[bi];
176 if (!dvp) {
177 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
178 error(WARNING, errmsg);
179 return 0;
180 }
181 SCOLOR contr;
182 raycontrib(contr, r, PRIMARY); // compute coefficient
183 if (rcp->HasFlag(RCcontrib))
184 smultscolor(contr, r->rcol); // -> value contribution
185 for (int i = 0; i < NCSAMP; i++)
186 *dvp++ += contr[i]; // accumulate color/spectrum
187 return 1;
188 }
189
190 // check for given format type in string, return last char position
191 static int
192 hasFormat(const char *tst, const char *match)
193 {
194 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
195 const char * s = tst;
196 while (*s) {
197 if (*s++ != '%')
198 continue;
199 while (!strchr(allPoss, *s))
200 s++;
201 if (strchr(match, *s))
202 break;
203 s++;
204 }
205 return (*s != '\0')*(s - tst);
206 }
207
208 // Add a modifier and arguments, opening output file(s)
209 bool
210 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
211 const char *prms, const char *binval, int bincnt)
212 {
213 if (!modn | !outspec || !*modn | !*outspec) {
214 error(WARNING, "ignoring bad call to AddModifier()");
215 return false;
216 }
217 if (!nChan) { // initial call?
218 if ((xres < 0) | (yres <= 0)) {
219 error(USER, "xres, yres must be set before first modifier");
220 return false;
221 }
222 if (!SetDataFormat(dtyp))
223 return false;
224 nChan = NCSAMP;
225 } else if (nChan != NCSAMP) {
226 error(USER, "number of spectral channels must be fixed");
227 return false;
228 }
229 if (Ready()) {
230 error(INTERNAL, "call to AddModifier() after PrepOutput()");
231 return false;
232 }
233 LUENT * lp = lu_find(&modLUT, modn);
234 if (lp->data) {
235 sprintf(errmsg, "duplicate modifier '%s'", modn);
236 error(USER, errmsg);
237 return false;
238 }
239 if (!lp->key) // create new entry
240 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
241
242 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
243 if (!mp) return false;
244 lp->data = (char *)mp;
245 const int modndx = hasFormat(outspec, "sb");
246 const int binndx = hasFormat(outspec, "diouxX");
247 int bin0 = 0;
248 char fnbuf[512];
249 if (!modndx || (binndx > 0) & (modndx > binndx))
250 sprintf(fnbuf, outspec, bin0, modn);
251 else
252 sprintf(fnbuf, outspec, modn, bin0);
253 RcontribOutput * olast = NULL;
254 RcontribOutput * op;
255 for (op = outList; op; op = (olast=op)->next) {
256 if (strcmp(op->GetName(), fnbuf))
257 continue;
258 if (modndx) { // this ain't right
259 sprintf(errmsg, "output name collision for '%s'", fnbuf);
260 error(USER, errmsg);
261 return false;
262 }
263 if ((binndx > 0) & (xres > 0)) {
264 sprintf(fnbuf, outspec, ++bin0);
265 continue; // each bin goes to another image
266 }
267 mp->coffset = op->rowBytes; // else add to what's there
268 break;
269 }
270 if (!op) { // create new output channel?
271 op = new RcontribOutput(fnbuf);
272 if (olast) olast->next = op;
273 else outList = op;
274 }
275 mp->opl = op; // first (maybe only) output channel
276 if (modndx) // remember modifier if part of name
277 op->omod = lp->key;
278 if (binndx) { // append output image/bin list
279 op->rowBytes += dsiz;
280 op->obin = bin0;
281 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
282 if (!modndx || (binndx > 0) & (modndx > binndx))
283 sprintf(fnbuf, outspec, bin0+bincnt, modn);
284 else
285 sprintf(fnbuf, outspec, modn, bin0+bincnt);
286 if (!op->next) {
287 olast = op;
288 olast->next = op = new RcontribOutput(fnbuf);
289 if (modndx) op->omod = lp->key;
290 op->obin = bin0+bincnt;
291 } else {
292 op = op->next;
293 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
294 "bin number mismatch in AddModifier()");
295 }
296 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
297 "row offset mismatch in AddModifier()");
298 op->rowBytes += dsiz;
299 }
300 } else // else send all results to this channel
301 op->rowBytes += mp->nbins*dsiz;
302 return true;
303 }
304
305 // Add a file of modifiers with associated arguments
306 bool
307 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
308 const char *prms,
309 const char *binval, int bincnt)
310 {
311 char * path = getpath(const_cast<char *>(modfn),
312 getrlibpath(), R_OK);
313 FILE * fp;
314 if (!path || !(fp = fopen(path, "r"))) {
315 sprintf(errmsg, "cannot %s modifier file '%s'",
316 path ? "open" : "find", modfn);
317 error(SYSTEM, errmsg);
318 return false;
319 }
320 char mod[MAXSTR];
321 while (fgetword(mod, sizeof(mod), fp))
322 if (!AddModifier(mod, outspec, prms, binval, bincnt))
323 return false;
324 fclose(fp);
325 return true;
326 }
327
328 // call-back to check if modifier has been loaded
329 static int
330 checkModExists(const LUENT *lp, void *p)
331 {
332 if (modifier(lp->key) != OVOID)
333 return 1;
334
335 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
336 error(WARNING, errmsg);
337 return 0;
338 }
339
340 // Prepare output channels and return # completed rows
341 int
342 RcontribSimulManager::PrepOutput()
343 {
344 if (!outList || !RtraceSimulManager::Ready()) {
345 error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
346 return -1;
347 }
348 if (!cdsF) {
349 error(INTERNAL, "missing RdataShare constructor call (*cdsF)");
350 return -1;
351 }
352 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
353 return -1;
354
355 int remWarnings = 20;
356 for (RcontribOutput *op = outList; op; op = op->next) {
357 if (op->rData) {
358 error(INTERNAL, "output channel already open in PrepOutput()");
359 return -1;
360 }
361 op->nRows = yres * (xres + !xres);
362 op->rData = (*cdsF)(op->ofname, outOp,
363 GetHeadLen()+1024 + op->nRows*op->rowBytes);
364 freeqstr(op->ofname); op->ofname = NULL;
365 if (outOp == RCOrecover) {
366 int rd = op->CheckHeader(this);
367 if (rd < 0)
368 return -1;
369 if (rd >= op->nRows) {
370 if (remWarnings >= 0) {
371 sprintf(errmsg, "recovered output '%s' already done",
372 op->GetName());
373 error(WARNING, remWarnings ? errmsg : "etc...");
374 remWarnings--;
375 }
376 rd = op->nRows;
377 }
378 if (!rInPos | (rInPos > rd)) rInPos = rd;
379 } else if (!op->NewHeader(this))
380 return -1;
381 // make sure there's room
382 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
383 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
384 return -1; // calls error() for us
385 }
386 rowsDone.NewBitMap(outList->nRows); // create row completion map
387 rowsDone.ClearBits(0, rInPos, true);
388 return rInPos;
389 }
390
391 // Create header in open write-only channel
392 bool
393 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
394 {
395 int headlim = rcp->GetHeadLen() + 1024;
396 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
397 int esiz;
398 const int etyp = rcp->GetFormat(&esiz);
399
400 strcpy(hdr, HDRSTR);
401 strcat(hdr, "RADIANCE\n");
402 begData = strlen(hdr);
403 strcpy(hdr+begData, rcp->GetHeadStr());
404 begData += rcp->GetHeadLen();
405 if (omod) {
406 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
407 begData += strlen(hdr+begData);
408 }
409 if (obin >= 0) {
410 sprintf(hdr+begData, "BIN=%d\n", obin);
411 begData += strlen(hdr+begData);
412 }
413 strcpy(hdr+begData, ROWZEROSTR);
414 rowCountPos = begData+LNROWSTR;
415 begData += sizeof(ROWZEROSTR)-1;
416 if (!rcp->xres | (rowBytes > esiz)) {
417 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
418 begData += strlen(hdr+begData);
419 }
420 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
421 begData += strlen(hdr+begData);
422 if (NCSAMP > 3) {
423 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
424 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
425 begData += strlen(hdr+begData);
426 }
427 if (etyp != 'c') {
428 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
429 begData += strlen(hdr+begData);
430 }
431 int align = 0;
432 switch (etyp) {
433 case 'f':
434 align = sizeof(float);
435 break;
436 case 'd':
437 align = sizeof(double);
438 break;
439 case 'c':
440 break;
441 default:
442 error(INTERNAL, "unsupported data type in NewHeader()");
443 return false;
444 }
445 strcpy(hdr+begData, FMTSTR); // write format string
446 begData += strlen(hdr+begData);
447 strcpy(hdr+begData, formstr(etyp));
448 begData += strlen(hdr+begData);
449 if (align) // align data at end of header
450 while ((begData+2) % align)
451 hdr[begData++] = ' ';
452 hdr[begData++] = '\n'; // EOL for data format
453 hdr[begData++] = '\n'; // end of nominal header
454 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
455 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
456 begData += strlen(hdr+begData);
457 }
458 return rData->ReleaseMemory(hdr, RDSwrite);
459 }
460
461 // find string argument in header for named variable
462 static const char *
463 findArgs(const char *hdr, const char *vnm, int len)
464 {
465 const char * npos = strnstr(hdr, vnm, len);
466
467 if (!npos) return NULL;
468
469 npos += strlen(vnm); // find start of (first) argument
470 len -= npos - hdr;
471 while (len-- > 0 && (*npos == '=') | isspace(*npos))
472 if (*npos++ == '\n')
473 return NULL;
474
475 return (len >= 0) ? npos : NULL;
476 }
477
478 // Load and check header in read/write channel
479 int
480 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
481 {
482 int esiz;
483 const int etyp = rcp->GetFormat(&esiz);
484 const int maxlen = rcp->GetHeadLen() + 1024;
485 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
486 const char * cp;
487 // find end of header
488 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
489 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
490 error(USER, errmsg);
491 return -1;
492 }
493 begData = cp - hdr + 1; // increment again at end
494 // check # components
495 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
496 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
497 error(USER, errmsg);
498 return -1;
499 }
500 // check format
501 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
502 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
503 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
504 error(USER, errmsg);
505 return -1;
506 }
507 // check #columns
508 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
509 !rcp->xres | (rowBytes > esiz)) {
510 sprintf(errmsg, "expected NCOLS=%d in '%s'",
511 int(rowBytes/esiz), GetName());
512 error(USER, errmsg);
513 return -1;
514 }
515 // find row count
516 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
517 sprintf(errmsg, "missing NROWS in '%s'", GetName());
518 error(USER, errmsg);
519 return -1;
520 }
521 rowCountPos = cp - hdr;
522 int rlast = atoi(cp);
523 begData++; // advance past closing EOL
524 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
525 char rbuf[64];
526 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
527 int rlen = strlen(rbuf);
528 if (strncmp(rbuf, hdr+begData, rlen)) {
529 sprintf(errmsg, "bad resolution string in '%s'", GetName());
530 error(USER, errmsg);
531 return -1;
532 }
533 begData += rlen;
534 }
535 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
536 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
537 }
538
539 // Rewind calculation (previous results unchanged)
540 bool
541 RcontribSimulManager::ResetRow(int r)
542 {
543 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
544 error(WARNING, "ignoring bad call to ResetRow()");
545 return (r == rInPos);
546 }
547 FlushQueue(); // finish current and reset
548 for (RcontribOutput *op = outList; op; op = op->next)
549 if (!op->SetRowsDone(r))
550 return false;
551
552 rowsDone.ClearBits(r, rInPos-r, false);
553 rInPos = r;
554 return true;
555 }
556
557 // call-back for averaging each modifier's contributions to assigned channel(s)
558 static int
559 putModContrib(const LUENT *lp, void *p)
560 {
561 RcontribMod * mp = (RcontribMod *)lp->data;
562 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
563 const double sca = 1./rcp->accum;
564 const DCOLORV * dvp = mp->cbin;
565 RcontribOutput * op = mp->opl;
566 int i, n;
567
568 switch (rcp->GetFormat()) { // conversion based on output type
569 case 'd': {
570 double * dvo = (double *)op->InsertionP(mp->coffset);
571 for (n = mp->nbins; n--; ) {
572 for (i = 0; i < NCSAMP; i++)
573 *dvo++ = *dvp++ * sca;
574 if ((op->obin >= 0) & (n > 0))
575 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
576 }
577 } break;
578 case 'f': {
579 float * fvo = (float *)op->InsertionP(mp->coffset);
580 for (n = mp->nbins; n--; ) {
581 for (i = 0; i < NCSAMP; i++)
582 *fvo++ = float(*dvp++ * sca);
583 if ((op->obin >= 0) & (n > 0))
584 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
585 }
586 } break;
587 case 'c': {
588 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
589 for (n = mp->nbins; n--; ) {
590 SCOLOR scol;
591 for (i = 0; i < NCSAMP; i++)
592 scol[i] = COLORV(*dvp++ * sca);
593 scolor_scolr(cvo, scol);
594 cvo += LSCOLR;
595 if ((op->obin >= 0) & (n > 0))
596 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
597 }
598 } break;
599 default:
600 error(CONSISTENCY, "unsupported output type in sendModContrib()");
601 return -1;
602 }
603 // clear for next tally
604 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
605 return 1;
606 }
607
608 // Add a ray/bundle to compute next record
609 int
610 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
611 {
612 if (!Ready())
613 return 0;
614 if (rInPos >= outList->nRows) {
615 error(WARNING, "ComputeRecord() called after last record");
616 return 0;
617 }
618 if (nkids > 0) { // in parent process?
619 int k = GetChild(); // updates output rows
620 if (k < 0) return -1; // can't really happen
621 RowAssignment rass;
622 rass.row = kidRow[k] = rInPos++;
623 rass.ac = accum;
624 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
625 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
626 != sizeof(FVECT)*2*accum) {
627 error(SYSTEM, "cannot write to child; dead process?");
628 return -1;
629 }
630 return accum; // tracing/output happens in child
631 }
632 if (NCSAMP != nChan) {
633 error(INTERNAL, "number of color channels must remain fixed");
634 return -1;
635 }
636 // actual work is done here...
637 if (EnqueueBundle(orig_direc, accum) < 0)
638 return -1;
639
640 RcontribOutput * op; // prepare output buffers
641 for (op = outList; op; op = op->next)
642 if (!op->GetRow(rInPos))
643 return -1;
644 // convert averages & clear
645 if (lu_doall(&modLUT, putModContrib, this) < 0)
646 return -1;
647 // write buffers
648 for (op = outList; op; op = op->next)
649 op->DoneRow();
650
651 if (!nkids) // update row if solo process
652 UpdateRowsDone(rInPos++); // XXX increment here is critical
653
654 return accum;
655 }
656
657 // Get next available child, returning index or -1 if forceWait & idling
658 int
659 RcontribSimulManager::GetChild(bool forceWait)
660 {
661 if (nkids <= 0)
662 return -1;
663 // take inventory
664 int pn, n = 0;
665 fd_set writeset, errset;
666 FD_ZERO(&writeset); FD_ZERO(&errset);
667 for (pn = nkids; pn--; ) {
668 if (kidRow[pn] < 0) { // child already ready?
669 if (forceWait) continue;
670 return pn; // good enough
671 }
672 FD_SET(kid[pn].w, &writeset); // will check on this one
673 FD_SET(kid[pn].w, &errset);
674 if (kid[pn].w >= n)
675 n = kid[pn].w + 1;
676 }
677 if (!n) // every child is idle?
678 return -1;
679 // wait on "busy" child(ren)
680 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
681 if (errno != EINTR) {
682 error(SYSTEM, "select call failed in GetChild()");
683 return -1;
684 }
685 pn = -1; // get flags set by select
686 for (n = nkids; n--; )
687 if (kidRow[n] >= 0 &&
688 FD_ISSET(kid[n].w, &writeset) |
689 FD_ISSET(kid[n].w, &errset)) {
690 // update output row counts
691 if (!FD_ISSET(kid[n].w, &errset))
692 UpdateRowsDone(kidRow[n]);
693 kidRow[n] = -1; // flag it available
694 pn = n;
695 }
696 return pn;
697 }
698
699 // Update row completion bitmap and update outputs (does not change rInPos)
700 bool
701 RcontribSimulManager::UpdateRowsDone(int r)
702 {
703 if (!rowsDone.TestAndSet(r)) {
704 error(WARNING, "redundant call to UpdateRowsDone()");
705 return false;
706 }
707 int nDone = GetRowFinished();
708 if (nDone <= r)
709 return true; // nothing to update, yet
710 for (RcontribOutput *op = outList; op; op = op->next)
711 if (!op->SetRowsDone(nDone))
712 return false;
713 return true; // up-to-date
714 }
715
716 // Run rcontrib child process (never returns)
717 void
718 RcontribSimulManager::RunChild()
719 {
720 FVECT * vecList = NULL;
721 RowAssignment rass;
722 ssize_t nr;
723
724 accum = 0;
725 errno = 0;
726 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
727 if (!rass.ac) {
728 error(CONSISTENCY, "bad accumulator count in child");
729 exit(1);
730 }
731 if (rass.ac > accum)
732 vecList = (FVECT *)erealloc(vecList,
733 sizeof(FVECT)*2*rass.ac);
734 accum = rass.ac;
735 rInPos = rass.row;
736
737 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
738 sizeof(FVECT)*2*accum)
739 break;
740
741 if (ComputeRecord(vecList) <= 0)
742 exit(1);
743 }
744 if (nr) {
745 error(SYSTEM, "read error in child process");
746 exit(1);
747 }
748 exit(0);
749 }
750
751 // Add child processes as indicated
752 bool
753 RcontribSimulManager::StartKids(int n2go)
754 {
755 if ((n2go <= 0) | (nkids + n2go <= 1))
756 return false;
757
758 if (!nkids)
759 cow_memshare(); // preload objects
760
761 n2go += nkids; // => desired brood size
762 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
763 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
764
765 fflush(stdout); // shouldn't use, anyway
766 while (nkids < n2go) {
767 kid[nkids] = sp_inactive;
768 kid[nkids].w = dup(1);
769 kid[nkids].flags |= PF_FILT_OUT;
770 int rv = open_process(&kid[nkids], NULL);
771 if (!rv) { // in child process?
772 while (nkids-- > 0)
773 close(kid[nkids].w);
774 free(kid); free(kidRow);
775 kid = NULL; kidRow = NULL;
776 RunChild(); // should never return
777 _exit(1);
778 }
779 if (rv < 0) {
780 error(SYSTEM, "cannot fork worker process");
781 return false;
782 }
783 kidRow[nkids++] = -1; // newborn is ready
784 }
785 return true;
786 }
787
788 // Reap the indicated number of children (all if 0)
789 int
790 RcontribSimulManager::StopKids(int n2end)
791 {
792 if (nkids <= 0)
793 return 0;
794 FlushQueue();
795 int status = 0;
796 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
797 status = close_processes(kid, nkids);
798 free(kid); free(kidRow);
799 kid = NULL; kidRow = NULL;
800 nkids = 0;
801 } else { // else shrink family
802 int st;
803 while (n2end-- > 0)
804 if ((st = close_process(&kid[--nkids])) > 0)
805 status = st;
806 // could call realloc(), but it hardly matters
807 }
808 if (status) {
809 sprintf(errmsg, "non-zero (%d) status from child", status);
810 error(WARNING, errmsg);
811 }
812 return status;
813 }
814
815 // Set number of computation threads (0 => #cores)
816 int
817 RcontribSimulManager::SetThreadCount(int nt)
818 {
819 if (!Ready()) {
820 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
821 return 0;
822 }
823 if (nt < 0)
824 return NThreads();
825 if (!nt) nt = GetNCores();
826 int status = 0;
827 if (nt == 1)
828 status = StopKids();
829 else if (nt < nkids)
830 status = StopKids(nkids-nt);
831 else if (nt > nkids)
832 StartKids(nt-nkids);
833 if (status) {
834 sprintf(errmsg, "non-zero (%d) status from child", status);
835 error(WARNING, errmsg);
836 }
837 return NThreads();
838 }