ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.8
Committed: Thu Nov 7 18:07:43 2024 UTC (5 months, 3 weeks ago) by greg
Branch: MAIN
Changes since 2.7: +11 -11 lines
Log Message:
fix(rxcontrib): Fixed issues with modifier names in output files

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.7 2024/11/06 19:45:59 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 // Our default data share function
56 RdataShare *
57 defDataShare(const char *name, RCOutputOp op, size_t siz)
58 {
59 return new RdataShareMap(name, RSDOflags[op], siz);
60 }
61
62 // Allocate rcontrib accumulator
63 RcontribMod *
64 NewRcMod(const char *prms, const char *binexpr, int ncbins)
65 {
66 if (!prms) prms = "";
67 if (!binexpr & (ncbins > 1)) {
68 error(USER, "missing bin expression");
69 return NULL;
70 }
71 if (ncbins <= 1) { // shouldn't have bin expression?
72 if (binexpr && strcmp(binexpr, "0"))
73 error(WARNING, "ignoring non-zero expression for single bin");
74 prms = "";
75 binexpr = NULL;
76 ncbins = 1;
77 }
78 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
79 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
80 strlen(prms)+1);
81
82 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
83 if (binexpr) {
84 mp->binv = eparse(const_cast<char *>(binexpr));
85 CHECK(mp->binv->type==NUM, WARNING, "constant bin expression");
86 }
87 mp->nbins = ncbins;
88 return mp;
89 }
90
91 // Free an RcontribMod
92 void
93 FreeRcMod(void *p)
94 {
95 if (!p) return;
96 EPNODE * bep = (*(RcontribMod *)p).binv;
97 if (bep) epfree(bep, true);
98 efree(p);
99 }
100
101 // Set output format ('f', 'd', or 'c')
102 bool
103 RcontribSimulManager::SetDataFormat(int ty)
104 {
105 if (outList) {
106 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
107 return false;
108 }
109 switch (ty) {
110 case 'f':
111 dsiz = sizeof(float)*NCSAMP;
112 break;
113 case 'd':
114 dsiz = sizeof(double)*NCSAMP;
115 break;
116 case 'c':
117 dsiz = LSCOLR;
118 break;
119 default:
120 sprintf(errmsg, "unsupported output format '%c'", ty);
121 error(INTERNAL, errmsg);
122 return false;
123 }
124 dtyp = ty;
125 return true;
126 }
127
128 // static call-back for rcontrib ray-tracing
129 int
130 RcontribSimulManager::RctCall(RAY *r, void *cd)
131 {
132 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
133 return 0;
134 // shadow ray not on source?
135 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
136 return 0;
137
138 const char * mname = objptr(r->ro->omod)->oname;
139 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
140 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
141 if (!mp)
142 return 0; // not in our modifier list
143
144 int bi = 0; // get bin index
145 if (mp->binv) {
146 worldfunc(RCCONTEXT, r);
147 set_eparams(mp->params);
148 double bval = evalue(mp->binv);
149 if (bval <= -.5)
150 return 0; // silently ignore negative bin index
151 bi = int(bval + .5);
152 }
153 DCOLORV * dvp = (*mp)[bi];
154 if (!dvp) {
155 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
156 error(WARNING, errmsg);
157 return 0;
158 }
159 SCOLOR contr;
160 raycontrib(contr, r, PRIMARY); // compute coefficient
161 if (rcp->HasFlag(RCcontrib))
162 smultscolor(contr, r->rcol); // -> value contribution
163 for (int i = 0; i < NCSAMP; i++)
164 *dvp++ += contr[i]; // accumulate color/spectrum
165 return 1;
166 }
167
168 // check for given format type in string, return last char position
169 static int
170 hasFormat(const char *tst, const char *match)
171 {
172 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
173 const char * s = tst;
174 while (*s) {
175 if (*s++ != '%')
176 continue;
177 while (!strchr(allPoss, *s))
178 s++;
179 if (strchr(match, *s))
180 break;
181 s++;
182 }
183 return (*s != '\0')*(s - tst);
184 }
185
186 // Add a modifier and arguments, opening output file(s)
187 bool
188 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
189 const char *prms, const char *binval, int bincnt)
190 {
191 if (!modn | !outspec || !*modn | !*outspec) {
192 error(WARNING, "ignoring bad call to AddModifier()");
193 return false;
194 }
195 if (!nChan) { // initial call?
196 if ((xres < 0) | (yres <= 0)) {
197 error(USER, "xres, yres must be set before first modifier");
198 return false;
199 }
200 if (!SetDataFormat(dtyp))
201 return false;
202 nChan = NCSAMP;
203 } else if (nChan != NCSAMP) {
204 error(USER, "number of spectral channels must be fixed");
205 return false;
206 }
207 if (Ready()) {
208 error(INTERNAL, "call to AddModifier() after PrepOutput()");
209 return false;
210 }
211 LUENT * lp = lu_find(&modLUT, modn);
212 if (lp->data) {
213 sprintf(errmsg, "duplicate modifier '%s'", modn);
214 error(USER, errmsg);
215 return false;
216 }
217 if (!lp->key) // create new entry
218 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
219
220 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
221 if (!mp) return false;
222 lp->data = (char *)mp;
223 const int modndx = hasFormat(outspec, "sb");
224 const int binndx = hasFormat(outspec, "diouxX");
225 int bin0 = 0;
226 char fnbuf[512];
227 if (!modndx || (binndx > 0) & (modndx > binndx))
228 sprintf(fnbuf, outspec, bin0, modn);
229 else
230 sprintf(fnbuf, outspec, modn, bin0);
231 RcontribOutput * olast = NULL;
232 RcontribOutput * op;
233 for (op = outList; op; op = (olast=op)->next) {
234 if (strcmp(op->GetName(), fnbuf))
235 continue;
236 if (modndx) { // this ain't right
237 sprintf(errmsg, "output name collision for '%s'", fnbuf);
238 error(USER, errmsg);
239 return false;
240 }
241 if ((binndx > 0) & (xres > 0)) {
242 sprintf(fnbuf, outspec, ++bin0);
243 continue; // each bin goes to another image
244 }
245 mp->coffset = op->rowBytes; // else add to what's there
246 break;
247 }
248 if (!op) { // create new output channel?
249 op = new RcontribOutput(fnbuf);
250 if (olast) olast->next = op;
251 else outList = op;
252 }
253 mp->opl = op; // first (maybe only) output channel
254 if (modndx) // remember modifier if part of name
255 op->omod = lp->key;
256 if (binndx) { // append output image/bin list
257 op->rowBytes += dsiz;
258 op->obin = bin0;
259 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
260 if (!modndx || (binndx > 0) & (modndx > binndx))
261 sprintf(fnbuf, outspec, bin0+bincnt, modn);
262 else
263 sprintf(fnbuf, outspec, modn, bin0+bincnt);
264 if (!op->next) {
265 olast = op;
266 olast->next = op = new RcontribOutput(fnbuf);
267 if (modndx) op->omod = lp->key;
268 op->obin = bin0+bincnt;
269 } else {
270 op = op->next;
271 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
272 "bin number mismatch in AddModifier()");
273 }
274 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
275 "row offset mismatch in AddModifier()");
276 op->rowBytes += dsiz;
277 }
278 } else // else send all results to this channel
279 op->rowBytes += mp->nbins*dsiz;
280 return true;
281 }
282
283 // Add a file of modifiers with associated arguments
284 bool
285 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
286 const char *prms,
287 const char *binval, int bincnt)
288 {
289 char * path = getpath(const_cast<char *>(modfn),
290 getrlibpath(), R_OK);
291 FILE * fp;
292 if (!path || !(fp = fopen(path, "r"))) {
293 sprintf(errmsg, "cannot %s modifier file '%s'",
294 path ? "open" : "find", modfn);
295 error(SYSTEM, errmsg);
296 return false;
297 }
298 char mod[MAXSTR];
299 while (fgetword(mod, sizeof(mod), fp))
300 if (!AddModifier(mod, outspec, prms, binval, bincnt))
301 return false;
302 fclose(fp);
303 return true;
304 }
305
306 // call-back to check if modifier has been loaded
307 static int
308 checkModExists(const LUENT *lp, void *p)
309 {
310 if (modifier(lp->key) != OVOID)
311 return 1;
312
313 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
314 error(WARNING, errmsg);
315 return 0;
316 }
317
318 // Prepare output channels and return # completed rows
319 int
320 RcontribSimulManager::PrepOutput()
321 {
322 if (!outList || !RtraceSimulManager::Ready()) {
323 error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
324 return -1;
325 }
326 if (!cdsF) {
327 error(INTERNAL, "missing RdataShare constructor call (*cdsF)");
328 return -1;
329 }
330 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
331 return -1;
332
333 int remWarnings = 20;
334 for (RcontribOutput *op = outList; op; op = op->next) {
335 if (op->rData) {
336 error(INTERNAL, "output channel already open in PrepOutput()");
337 return -1;
338 }
339 op->nRows = yres * (xres + !xres);
340 op->rData = (*cdsF)(op->ofname, outOp,
341 GetHeadLen()+1024 + op->nRows*op->rowBytes);
342 freeqstr(op->ofname); op->ofname = NULL;
343 if (outOp == RCOrecover) {
344 int rd = op->CheckHeader(this);
345 if (rd < 0)
346 return -1;
347 if (rd >= op->nRows) {
348 if (remWarnings >= 0) {
349 sprintf(errmsg, "recovered output '%s' already done",
350 op->GetName());
351 error(WARNING, remWarnings ? errmsg : "etc...");
352 remWarnings--;
353 }
354 rd = op->nRows;
355 }
356 if (!rInPos | (rInPos > rd)) rInPos = rd;
357 } else if (!op->NewHeader(this))
358 return -1;
359 // make sure there's room
360 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
361 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
362 return -1; // calls error() for us
363 }
364 rowsDone.NewBitMap(outList->nRows); // create row completion map
365 rowsDone.ClearBits(0, rInPos, true);
366 return rInPos;
367 }
368
369 // Create header in open write-only channel
370 bool
371 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
372 {
373 int headlim = rcp->GetHeadLen() + 1024;
374 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
375 int esiz;
376 const int etyp = rcp->GetFormat(&esiz);
377
378 strcpy(hdr, HDRSTR);
379 strcat(hdr, "RADIANCE\n");
380 begData = strlen(hdr);
381 strcpy(hdr+begData, rcp->GetHeadStr());
382 begData += rcp->GetHeadLen();
383 if (omod) {
384 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
385 begData += strlen(hdr+begData);
386 }
387 if (obin >= 0) {
388 sprintf(hdr+begData, "BIN=%d\n", obin);
389 begData += strlen(hdr+begData);
390 }
391 strcpy(hdr+begData, ROWZEROSTR);
392 rowCountPos = begData+LNROWSTR;
393 begData += sizeof(ROWZEROSTR)-1;
394 if (!rcp->xres | (rowBytes > esiz)) {
395 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
396 begData += strlen(hdr+begData);
397 }
398 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
399 begData += strlen(hdr+begData);
400 if (NCSAMP > 3) {
401 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
402 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
403 begData += strlen(hdr+begData);
404 }
405 if (etyp != 'c') {
406 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
407 begData += strlen(hdr+begData);
408 }
409 int align = 0;
410 switch (etyp) {
411 case 'f':
412 align = sizeof(float);
413 break;
414 case 'd':
415 align = sizeof(double);
416 break;
417 case 'c':
418 break;
419 default:
420 error(INTERNAL, "unsupported data type in NewHeader()");
421 return false;
422 }
423 strcpy(hdr+begData, FMTSTR); // write format string
424 begData += strlen(hdr+begData);
425 strcpy(hdr+begData, formstr(etyp));
426 begData += strlen(hdr+begData);
427 if (align) // align data at end of header
428 while ((begData+2) % align)
429 hdr[begData++] = ' ';
430 hdr[begData++] = '\n'; // EOL for data format
431 hdr[begData++] = '\n'; // end of nominal header
432 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
433 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
434 begData += strlen(hdr+begData);
435 }
436 return rData->ReleaseMemory(hdr, RDSwrite);
437 }
438
439 // find string argument in header for named variable
440 static const char *
441 findArgs(const char *hdr, const char *vnm, int len)
442 {
443 const char * npos = strnstr(hdr, vnm, len);
444
445 if (!npos) return NULL;
446
447 npos += strlen(vnm); // find start of (first) argument
448 len -= npos - hdr;
449 while (len-- > 0 && (*npos == '=') | isspace(*npos))
450 if (*npos++ == '\n')
451 return NULL;
452
453 return (len >= 0) ? npos : NULL;
454 }
455
456 // Load and check header in read/write channel
457 int
458 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
459 {
460 int esiz;
461 const int etyp = rcp->GetFormat(&esiz);
462 const int maxlen = rcp->GetHeadLen() + 1024;
463 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
464 const char * cp;
465 // find end of header
466 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
467 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
468 error(USER, errmsg);
469 return -1;
470 }
471 begData = cp - hdr + 1; // increment again at end
472 // check # components
473 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
474 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
475 error(USER, errmsg);
476 return -1;
477 }
478 // check format
479 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
480 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
481 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
482 error(USER, errmsg);
483 return -1;
484 }
485 // check #columns
486 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
487 !rcp->xres | (rowBytes > esiz)) {
488 sprintf(errmsg, "expected NCOLS=%d in '%s'",
489 int(rowBytes/esiz), GetName());
490 error(USER, errmsg);
491 return -1;
492 }
493 // find row count
494 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
495 sprintf(errmsg, "missing NROWS in '%s'", GetName());
496 error(USER, errmsg);
497 return -1;
498 }
499 rowCountPos = cp - hdr;
500 int rlast = atoi(cp);
501 begData++; // advance past closing EOL
502 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
503 char rbuf[64];
504 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
505 int rlen = strlen(rbuf);
506 if (strncmp(rbuf, hdr+begData, rlen)) {
507 sprintf(errmsg, "bad resolution string in '%s'", GetName());
508 error(USER, errmsg);
509 return -1;
510 }
511 begData += rlen;
512 }
513 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
514 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
515 }
516
517 // Rewind calculation (previous results unchanged)
518 bool
519 RcontribSimulManager::ResetRow(int r)
520 {
521 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
522 error(WARNING, "ignoring bad call to ResetRow()");
523 return (r == rInPos);
524 }
525 FlushQueue(); // finish current and reset
526 for (RcontribOutput *op = outList; op; op = op->next)
527 if (!op->SetRowsDone(r))
528 return false;
529
530 rowsDone.ClearBits(r, rInPos-r, false);
531 rInPos = r;
532 return true;
533 }
534
535 // call-back for averaging each modifier's contributions to assigned channel(s)
536 static int
537 putModContrib(const LUENT *lp, void *p)
538 {
539 RcontribMod * mp = (RcontribMod *)lp->data;
540 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
541 const double sca = 1./rcp->accum;
542 const DCOLORV * dvp = mp->cbin;
543 RcontribOutput * op = mp->opl;
544 int i, n;
545
546 switch (rcp->GetFormat()) { // conversion based on output type
547 case 'd': {
548 double * dvo = (double *)op->InsertionP(mp->coffset);
549 for (n = mp->nbins; n--; ) {
550 for (i = 0; i < NCSAMP; i++)
551 *dvo++ = *dvp++ * sca;
552 if ((op->obin >= 0) & (n > 0))
553 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
554 }
555 } break;
556 case 'f': {
557 float * fvo = (float *)op->InsertionP(mp->coffset);
558 for (n = mp->nbins; n--; ) {
559 for (i = 0; i < NCSAMP; i++)
560 *fvo++ = float(*dvp++ * sca);
561 if ((op->obin >= 0) & (n > 0))
562 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
563 }
564 } break;
565 case 'c': {
566 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
567 for (n = mp->nbins; n--; ) {
568 SCOLOR scol;
569 for (i = 0; i < NCSAMP; i++)
570 scol[i] = COLORV(*dvp++ * sca);
571 scolor_scolr(cvo, scol);
572 cvo += LSCOLR;
573 if ((op->obin >= 0) & (n > 0))
574 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
575 }
576 } break;
577 default:
578 error(CONSISTENCY, "unsupported output type in sendModContrib()");
579 return -1;
580 }
581 // clear for next tally
582 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
583 return 1;
584 }
585
586 // Add a ray/bundle to compute next record
587 int
588 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
589 {
590 if (!Ready())
591 return 0;
592 if (rInPos >= outList->nRows) {
593 error(WARNING, "ComputeRecord() called after last record");
594 return 0;
595 }
596 if (nkids > 0) { // in parent process?
597 int k = GetChild(); // updates output rows
598 if (k < 0) return -1; // can't really happen
599 RowAssignment rass;
600 rass.row = kidRow[k] = rInPos++;
601 rass.ac = accum;
602 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
603 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
604 != sizeof(FVECT)*2*accum) {
605 error(SYSTEM, "cannot write to child; dead process?");
606 return -1;
607 }
608 return accum; // tracing/output happens in child
609 }
610 if (NCSAMP != nChan) {
611 error(INTERNAL, "number of color channels must remain fixed");
612 return -1;
613 }
614 // actual work is done here...
615 if (EnqueueBundle(orig_direc, accum) < 0)
616 return -1;
617
618 RcontribOutput * op; // prepare output buffers
619 for (op = outList; op; op = op->next)
620 if (!op->GetRow(rInPos))
621 return -1;
622 // convert averages & clear
623 if (lu_doall(&modLUT, putModContrib, this) < 0)
624 return -1;
625 // write buffers
626 for (op = outList; op; op = op->next)
627 op->DoneRow();
628
629 if (!nkids) // update row if solo process
630 UpdateRowsDone(rInPos++); // XXX increment here is critical
631
632 return accum;
633 }
634
635 // Get next available child, returning index or -1 if forceWait & idling
636 int
637 RcontribSimulManager::GetChild(bool forceWait)
638 {
639 if (nkids <= 0)
640 return -1;
641 // take inventory
642 int pn, n = 0;
643 fd_set writeset, errset;
644 FD_ZERO(&writeset); FD_ZERO(&errset);
645 for (pn = nkids; pn--; ) {
646 if (kidRow[pn] < 0) { // child already ready?
647 if (forceWait) continue;
648 return pn; // good enough
649 }
650 FD_SET(kid[pn].w, &writeset); // will check on this one
651 FD_SET(kid[pn].w, &errset);
652 if (kid[pn].w >= n)
653 n = kid[pn].w + 1;
654 }
655 if (!n) // every child is idle?
656 return -1;
657 // wait on "busy" child(ren)
658 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
659 if (errno != EINTR) {
660 error(SYSTEM, "select call failed in GetChild()");
661 return -1;
662 }
663 pn = -1; // get flags set by select
664 for (n = nkids; n--; )
665 if (kidRow[n] >= 0 &&
666 FD_ISSET(kid[n].w, &writeset) |
667 FD_ISSET(kid[n].w, &errset)) {
668 // update output row counts
669 if (!FD_ISSET(kid[n].w, &errset))
670 UpdateRowsDone(kidRow[n]);
671 kidRow[n] = -1; // flag it available
672 pn = n;
673 }
674 return pn;
675 }
676
677 // Update row completion bitmap and update outputs (does not change rInPos)
678 bool
679 RcontribSimulManager::UpdateRowsDone(int r)
680 {
681 if (!rowsDone.TestAndSet(r)) {
682 error(WARNING, "redundant call to UpdateRowsDone()");
683 return false;
684 }
685 int nDone = GetRowFinished();
686 if (nDone <= r)
687 return true; // nothing to update, yet
688 for (RcontribOutput *op = outList; op; op = op->next)
689 if (!op->SetRowsDone(nDone))
690 return false;
691 return true; // up-to-date
692 }
693
694 // Run rcontrib child process (never returns)
695 void
696 RcontribSimulManager::RunChild()
697 {
698 FVECT * vecList = NULL;
699 RowAssignment rass;
700 ssize_t nr;
701
702 accum = 0;
703 errno = 0;
704 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
705 if (!rass.ac) {
706 error(CONSISTENCY, "bad accumulator count in child");
707 exit(1);
708 }
709 if (rass.ac > accum)
710 vecList = (FVECT *)erealloc(vecList,
711 sizeof(FVECT)*2*rass.ac);
712 accum = rass.ac;
713 rInPos = rass.row;
714
715 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
716 sizeof(FVECT)*2*accum)
717 break;
718
719 if (ComputeRecord(vecList) <= 0)
720 exit(1);
721 }
722 if (nr) {
723 error(SYSTEM, "read error in child process");
724 exit(1);
725 }
726 exit(0);
727 }
728
729 // Add child processes as indicated
730 bool
731 RcontribSimulManager::StartKids(int n2go)
732 {
733 if ((n2go <= 0) | (nkids + n2go <= 1))
734 return false;
735
736 if (!nkids)
737 cow_memshare(); // preload objects
738
739 n2go += nkids; // => desired brood size
740 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
741 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
742
743 fflush(stdout); // shouldn't use, anyway
744 while (nkids < n2go) {
745 kid[nkids] = sp_inactive;
746 kid[nkids].w = dup(1);
747 kid[nkids].flags |= PF_FILT_OUT;
748 int rv = open_process(&kid[nkids], NULL);
749 if (!rv) { // in child process?
750 while (nkids-- > 0)
751 close(kid[nkids].w);
752 free(kid); free(kidRow);
753 kid = NULL; kidRow = NULL;
754 RunChild(); // should never return
755 _exit(1);
756 }
757 if (rv < 0) {
758 error(SYSTEM, "cannot fork worker process");
759 return false;
760 }
761 kidRow[nkids++] = -1; // newborn is ready
762 }
763 return true;
764 }
765
766 // Reap the indicated number of children (all if 0)
767 int
768 RcontribSimulManager::StopKids(int n2end)
769 {
770 if (nkids <= 0)
771 return 0;
772 FlushQueue();
773 int status = 0;
774 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
775 status = close_processes(kid, nkids);
776 free(kid); free(kidRow);
777 kid = NULL; kidRow = NULL;
778 nkids = 0;
779 } else { // else shrink family
780 int st;
781 while (n2end-- > 0)
782 if ((st = close_process(&kid[--nkids])) > 0)
783 status = st;
784 // could call realloc(), but it hardly matters
785 }
786 if (status) {
787 sprintf(errmsg, "non-zero (%d) status from child", status);
788 error(WARNING, errmsg);
789 }
790 return status;
791 }
792
793 // Set number of computation threads (0 => #cores)
794 int
795 RcontribSimulManager::SetThreadCount(int nt)
796 {
797 if (!Ready()) {
798 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
799 return 0;
800 }
801 if (nt < 0)
802 return NThreads();
803 if (!nt) nt = GetNCores();
804 int status = 0;
805 if (nt == 1)
806 status = StopKids();
807 else if (nt < nkids)
808 status = StopKids(nkids-nt);
809 else if (nt > nkids)
810 StartKids(nt-nkids);
811 if (status) {
812 sprintf(errmsg, "non-zero (%d) status from child", status);
813 error(WARNING, errmsg);
814 }
815 return NThreads();
816 }