ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.9
Committed: Tue Dec 3 17:39:42 2024 UTC (5 months, 2 weeks ago) by greg
Branch: MAIN
Changes since 2.8: +14 -1 lines
Log Message:
refactor(rxcontrib): Moved definition of formstr() to make it more accessible

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.8 2024/11/07 18:07:43 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 // Get format identifier
56 const char *
57 formstr(int f)
58 {
59 switch (f) {
60 case 'a': return("ascii");
61 case 'f': return("float");
62 case 'd': return("double");
63 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
64 }
65 return("unknown");
66 }
67
68 // Our default data share function
69 RdataShare *
70 defDataShare(const char *name, RCOutputOp op, size_t siz)
71 {
72 return new RdataShareMap(name, RSDOflags[op], siz);
73 }
74
75 // Allocate rcontrib accumulator
76 RcontribMod *
77 NewRcMod(const char *prms, const char *binexpr, int ncbins)
78 {
79 if (!prms) prms = "";
80 if (!binexpr & (ncbins > 1)) {
81 error(USER, "missing bin expression");
82 return NULL;
83 }
84 if (ncbins <= 1) { // shouldn't have bin expression?
85 if (binexpr && strcmp(binexpr, "0"))
86 error(WARNING, "ignoring non-zero expression for single bin");
87 prms = "";
88 binexpr = NULL;
89 ncbins = 1;
90 }
91 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
92 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
93 strlen(prms)+1);
94
95 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
96 if (binexpr) {
97 mp->binv = eparse(const_cast<char *>(binexpr));
98 CHECK(mp->binv->type==NUM, WARNING, "constant bin expression");
99 }
100 mp->nbins = ncbins;
101 return mp;
102 }
103
104 // Free an RcontribMod
105 void
106 FreeRcMod(void *p)
107 {
108 if (!p) return;
109 EPNODE * bep = (*(RcontribMod *)p).binv;
110 if (bep) epfree(bep, true);
111 efree(p);
112 }
113
114 // Set output format ('f', 'd', or 'c')
115 bool
116 RcontribSimulManager::SetDataFormat(int ty)
117 {
118 if (outList) {
119 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
120 return false;
121 }
122 switch (ty) {
123 case 'f':
124 dsiz = sizeof(float)*NCSAMP;
125 break;
126 case 'd':
127 dsiz = sizeof(double)*NCSAMP;
128 break;
129 case 'c':
130 dsiz = LSCOLR;
131 break;
132 default:
133 sprintf(errmsg, "unsupported output format '%c'", ty);
134 error(INTERNAL, errmsg);
135 return false;
136 }
137 dtyp = ty;
138 return true;
139 }
140
141 // static call-back for rcontrib ray-tracing
142 int
143 RcontribSimulManager::RctCall(RAY *r, void *cd)
144 {
145 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
146 return 0;
147 // shadow ray not on source?
148 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
149 return 0;
150
151 const char * mname = objptr(r->ro->omod)->oname;
152 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
153 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
154 if (!mp)
155 return 0; // not in our modifier list
156
157 int bi = 0; // get bin index
158 if (mp->binv) {
159 worldfunc(RCCONTEXT, r);
160 set_eparams(mp->params);
161 double bval = evalue(mp->binv);
162 if (bval <= -.5)
163 return 0; // silently ignore negative bin index
164 bi = int(bval + .5);
165 }
166 DCOLORV * dvp = (*mp)[bi];
167 if (!dvp) {
168 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
169 error(WARNING, errmsg);
170 return 0;
171 }
172 SCOLOR contr;
173 raycontrib(contr, r, PRIMARY); // compute coefficient
174 if (rcp->HasFlag(RCcontrib))
175 smultscolor(contr, r->rcol); // -> value contribution
176 for (int i = 0; i < NCSAMP; i++)
177 *dvp++ += contr[i]; // accumulate color/spectrum
178 return 1;
179 }
180
181 // check for given format type in string, return last char position
182 static int
183 hasFormat(const char *tst, const char *match)
184 {
185 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
186 const char * s = tst;
187 while (*s) {
188 if (*s++ != '%')
189 continue;
190 while (!strchr(allPoss, *s))
191 s++;
192 if (strchr(match, *s))
193 break;
194 s++;
195 }
196 return (*s != '\0')*(s - tst);
197 }
198
199 // Add a modifier and arguments, opening output file(s)
200 bool
201 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
202 const char *prms, const char *binval, int bincnt)
203 {
204 if (!modn | !outspec || !*modn | !*outspec) {
205 error(WARNING, "ignoring bad call to AddModifier()");
206 return false;
207 }
208 if (!nChan) { // initial call?
209 if ((xres < 0) | (yres <= 0)) {
210 error(USER, "xres, yres must be set before first modifier");
211 return false;
212 }
213 if (!SetDataFormat(dtyp))
214 return false;
215 nChan = NCSAMP;
216 } else if (nChan != NCSAMP) {
217 error(USER, "number of spectral channels must be fixed");
218 return false;
219 }
220 if (Ready()) {
221 error(INTERNAL, "call to AddModifier() after PrepOutput()");
222 return false;
223 }
224 LUENT * lp = lu_find(&modLUT, modn);
225 if (lp->data) {
226 sprintf(errmsg, "duplicate modifier '%s'", modn);
227 error(USER, errmsg);
228 return false;
229 }
230 if (!lp->key) // create new entry
231 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
232
233 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
234 if (!mp) return false;
235 lp->data = (char *)mp;
236 const int modndx = hasFormat(outspec, "sb");
237 const int binndx = hasFormat(outspec, "diouxX");
238 int bin0 = 0;
239 char fnbuf[512];
240 if (!modndx || (binndx > 0) & (modndx > binndx))
241 sprintf(fnbuf, outspec, bin0, modn);
242 else
243 sprintf(fnbuf, outspec, modn, bin0);
244 RcontribOutput * olast = NULL;
245 RcontribOutput * op;
246 for (op = outList; op; op = (olast=op)->next) {
247 if (strcmp(op->GetName(), fnbuf))
248 continue;
249 if (modndx) { // this ain't right
250 sprintf(errmsg, "output name collision for '%s'", fnbuf);
251 error(USER, errmsg);
252 return false;
253 }
254 if ((binndx > 0) & (xres > 0)) {
255 sprintf(fnbuf, outspec, ++bin0);
256 continue; // each bin goes to another image
257 }
258 mp->coffset = op->rowBytes; // else add to what's there
259 break;
260 }
261 if (!op) { // create new output channel?
262 op = new RcontribOutput(fnbuf);
263 if (olast) olast->next = op;
264 else outList = op;
265 }
266 mp->opl = op; // first (maybe only) output channel
267 if (modndx) // remember modifier if part of name
268 op->omod = lp->key;
269 if (binndx) { // append output image/bin list
270 op->rowBytes += dsiz;
271 op->obin = bin0;
272 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
273 if (!modndx || (binndx > 0) & (modndx > binndx))
274 sprintf(fnbuf, outspec, bin0+bincnt, modn);
275 else
276 sprintf(fnbuf, outspec, modn, bin0+bincnt);
277 if (!op->next) {
278 olast = op;
279 olast->next = op = new RcontribOutput(fnbuf);
280 if (modndx) op->omod = lp->key;
281 op->obin = bin0+bincnt;
282 } else {
283 op = op->next;
284 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
285 "bin number mismatch in AddModifier()");
286 }
287 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
288 "row offset mismatch in AddModifier()");
289 op->rowBytes += dsiz;
290 }
291 } else // else send all results to this channel
292 op->rowBytes += mp->nbins*dsiz;
293 return true;
294 }
295
296 // Add a file of modifiers with associated arguments
297 bool
298 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
299 const char *prms,
300 const char *binval, int bincnt)
301 {
302 char * path = getpath(const_cast<char *>(modfn),
303 getrlibpath(), R_OK);
304 FILE * fp;
305 if (!path || !(fp = fopen(path, "r"))) {
306 sprintf(errmsg, "cannot %s modifier file '%s'",
307 path ? "open" : "find", modfn);
308 error(SYSTEM, errmsg);
309 return false;
310 }
311 char mod[MAXSTR];
312 while (fgetword(mod, sizeof(mod), fp))
313 if (!AddModifier(mod, outspec, prms, binval, bincnt))
314 return false;
315 fclose(fp);
316 return true;
317 }
318
319 // call-back to check if modifier has been loaded
320 static int
321 checkModExists(const LUENT *lp, void *p)
322 {
323 if (modifier(lp->key) != OVOID)
324 return 1;
325
326 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
327 error(WARNING, errmsg);
328 return 0;
329 }
330
331 // Prepare output channels and return # completed rows
332 int
333 RcontribSimulManager::PrepOutput()
334 {
335 if (!outList || !RtraceSimulManager::Ready()) {
336 error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
337 return -1;
338 }
339 if (!cdsF) {
340 error(INTERNAL, "missing RdataShare constructor call (*cdsF)");
341 return -1;
342 }
343 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
344 return -1;
345
346 int remWarnings = 20;
347 for (RcontribOutput *op = outList; op; op = op->next) {
348 if (op->rData) {
349 error(INTERNAL, "output channel already open in PrepOutput()");
350 return -1;
351 }
352 op->nRows = yres * (xres + !xres);
353 op->rData = (*cdsF)(op->ofname, outOp,
354 GetHeadLen()+1024 + op->nRows*op->rowBytes);
355 freeqstr(op->ofname); op->ofname = NULL;
356 if (outOp == RCOrecover) {
357 int rd = op->CheckHeader(this);
358 if (rd < 0)
359 return -1;
360 if (rd >= op->nRows) {
361 if (remWarnings >= 0) {
362 sprintf(errmsg, "recovered output '%s' already done",
363 op->GetName());
364 error(WARNING, remWarnings ? errmsg : "etc...");
365 remWarnings--;
366 }
367 rd = op->nRows;
368 }
369 if (!rInPos | (rInPos > rd)) rInPos = rd;
370 } else if (!op->NewHeader(this))
371 return -1;
372 // make sure there's room
373 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
374 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
375 return -1; // calls error() for us
376 }
377 rowsDone.NewBitMap(outList->nRows); // create row completion map
378 rowsDone.ClearBits(0, rInPos, true);
379 return rInPos;
380 }
381
382 // Create header in open write-only channel
383 bool
384 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
385 {
386 int headlim = rcp->GetHeadLen() + 1024;
387 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
388 int esiz;
389 const int etyp = rcp->GetFormat(&esiz);
390
391 strcpy(hdr, HDRSTR);
392 strcat(hdr, "RADIANCE\n");
393 begData = strlen(hdr);
394 strcpy(hdr+begData, rcp->GetHeadStr());
395 begData += rcp->GetHeadLen();
396 if (omod) {
397 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
398 begData += strlen(hdr+begData);
399 }
400 if (obin >= 0) {
401 sprintf(hdr+begData, "BIN=%d\n", obin);
402 begData += strlen(hdr+begData);
403 }
404 strcpy(hdr+begData, ROWZEROSTR);
405 rowCountPos = begData+LNROWSTR;
406 begData += sizeof(ROWZEROSTR)-1;
407 if (!rcp->xres | (rowBytes > esiz)) {
408 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
409 begData += strlen(hdr+begData);
410 }
411 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
412 begData += strlen(hdr+begData);
413 if (NCSAMP > 3) {
414 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
415 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
416 begData += strlen(hdr+begData);
417 }
418 if (etyp != 'c') {
419 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
420 begData += strlen(hdr+begData);
421 }
422 int align = 0;
423 switch (etyp) {
424 case 'f':
425 align = sizeof(float);
426 break;
427 case 'd':
428 align = sizeof(double);
429 break;
430 case 'c':
431 break;
432 default:
433 error(INTERNAL, "unsupported data type in NewHeader()");
434 return false;
435 }
436 strcpy(hdr+begData, FMTSTR); // write format string
437 begData += strlen(hdr+begData);
438 strcpy(hdr+begData, formstr(etyp));
439 begData += strlen(hdr+begData);
440 if (align) // align data at end of header
441 while ((begData+2) % align)
442 hdr[begData++] = ' ';
443 hdr[begData++] = '\n'; // EOL for data format
444 hdr[begData++] = '\n'; // end of nominal header
445 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
446 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
447 begData += strlen(hdr+begData);
448 }
449 return rData->ReleaseMemory(hdr, RDSwrite);
450 }
451
452 // find string argument in header for named variable
453 static const char *
454 findArgs(const char *hdr, const char *vnm, int len)
455 {
456 const char * npos = strnstr(hdr, vnm, len);
457
458 if (!npos) return NULL;
459
460 npos += strlen(vnm); // find start of (first) argument
461 len -= npos - hdr;
462 while (len-- > 0 && (*npos == '=') | isspace(*npos))
463 if (*npos++ == '\n')
464 return NULL;
465
466 return (len >= 0) ? npos : NULL;
467 }
468
469 // Load and check header in read/write channel
470 int
471 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
472 {
473 int esiz;
474 const int etyp = rcp->GetFormat(&esiz);
475 const int maxlen = rcp->GetHeadLen() + 1024;
476 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
477 const char * cp;
478 // find end of header
479 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
480 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
481 error(USER, errmsg);
482 return -1;
483 }
484 begData = cp - hdr + 1; // increment again at end
485 // check # components
486 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
487 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
488 error(USER, errmsg);
489 return -1;
490 }
491 // check format
492 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
493 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
494 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
495 error(USER, errmsg);
496 return -1;
497 }
498 // check #columns
499 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
500 !rcp->xres | (rowBytes > esiz)) {
501 sprintf(errmsg, "expected NCOLS=%d in '%s'",
502 int(rowBytes/esiz), GetName());
503 error(USER, errmsg);
504 return -1;
505 }
506 // find row count
507 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
508 sprintf(errmsg, "missing NROWS in '%s'", GetName());
509 error(USER, errmsg);
510 return -1;
511 }
512 rowCountPos = cp - hdr;
513 int rlast = atoi(cp);
514 begData++; // advance past closing EOL
515 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
516 char rbuf[64];
517 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
518 int rlen = strlen(rbuf);
519 if (strncmp(rbuf, hdr+begData, rlen)) {
520 sprintf(errmsg, "bad resolution string in '%s'", GetName());
521 error(USER, errmsg);
522 return -1;
523 }
524 begData += rlen;
525 }
526 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
527 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
528 }
529
530 // Rewind calculation (previous results unchanged)
531 bool
532 RcontribSimulManager::ResetRow(int r)
533 {
534 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
535 error(WARNING, "ignoring bad call to ResetRow()");
536 return (r == rInPos);
537 }
538 FlushQueue(); // finish current and reset
539 for (RcontribOutput *op = outList; op; op = op->next)
540 if (!op->SetRowsDone(r))
541 return false;
542
543 rowsDone.ClearBits(r, rInPos-r, false);
544 rInPos = r;
545 return true;
546 }
547
548 // call-back for averaging each modifier's contributions to assigned channel(s)
549 static int
550 putModContrib(const LUENT *lp, void *p)
551 {
552 RcontribMod * mp = (RcontribMod *)lp->data;
553 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
554 const double sca = 1./rcp->accum;
555 const DCOLORV * dvp = mp->cbin;
556 RcontribOutput * op = mp->opl;
557 int i, n;
558
559 switch (rcp->GetFormat()) { // conversion based on output type
560 case 'd': {
561 double * dvo = (double *)op->InsertionP(mp->coffset);
562 for (n = mp->nbins; n--; ) {
563 for (i = 0; i < NCSAMP; i++)
564 *dvo++ = *dvp++ * sca;
565 if ((op->obin >= 0) & (n > 0))
566 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
567 }
568 } break;
569 case 'f': {
570 float * fvo = (float *)op->InsertionP(mp->coffset);
571 for (n = mp->nbins; n--; ) {
572 for (i = 0; i < NCSAMP; i++)
573 *fvo++ = float(*dvp++ * sca);
574 if ((op->obin >= 0) & (n > 0))
575 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
576 }
577 } break;
578 case 'c': {
579 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
580 for (n = mp->nbins; n--; ) {
581 SCOLOR scol;
582 for (i = 0; i < NCSAMP; i++)
583 scol[i] = COLORV(*dvp++ * sca);
584 scolor_scolr(cvo, scol);
585 cvo += LSCOLR;
586 if ((op->obin >= 0) & (n > 0))
587 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
588 }
589 } break;
590 default:
591 error(CONSISTENCY, "unsupported output type in sendModContrib()");
592 return -1;
593 }
594 // clear for next tally
595 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
596 return 1;
597 }
598
599 // Add a ray/bundle to compute next record
600 int
601 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
602 {
603 if (!Ready())
604 return 0;
605 if (rInPos >= outList->nRows) {
606 error(WARNING, "ComputeRecord() called after last record");
607 return 0;
608 }
609 if (nkids > 0) { // in parent process?
610 int k = GetChild(); // updates output rows
611 if (k < 0) return -1; // can't really happen
612 RowAssignment rass;
613 rass.row = kidRow[k] = rInPos++;
614 rass.ac = accum;
615 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
616 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
617 != sizeof(FVECT)*2*accum) {
618 error(SYSTEM, "cannot write to child; dead process?");
619 return -1;
620 }
621 return accum; // tracing/output happens in child
622 }
623 if (NCSAMP != nChan) {
624 error(INTERNAL, "number of color channels must remain fixed");
625 return -1;
626 }
627 // actual work is done here...
628 if (EnqueueBundle(orig_direc, accum) < 0)
629 return -1;
630
631 RcontribOutput * op; // prepare output buffers
632 for (op = outList; op; op = op->next)
633 if (!op->GetRow(rInPos))
634 return -1;
635 // convert averages & clear
636 if (lu_doall(&modLUT, putModContrib, this) < 0)
637 return -1;
638 // write buffers
639 for (op = outList; op; op = op->next)
640 op->DoneRow();
641
642 if (!nkids) // update row if solo process
643 UpdateRowsDone(rInPos++); // XXX increment here is critical
644
645 return accum;
646 }
647
648 // Get next available child, returning index or -1 if forceWait & idling
649 int
650 RcontribSimulManager::GetChild(bool forceWait)
651 {
652 if (nkids <= 0)
653 return -1;
654 // take inventory
655 int pn, n = 0;
656 fd_set writeset, errset;
657 FD_ZERO(&writeset); FD_ZERO(&errset);
658 for (pn = nkids; pn--; ) {
659 if (kidRow[pn] < 0) { // child already ready?
660 if (forceWait) continue;
661 return pn; // good enough
662 }
663 FD_SET(kid[pn].w, &writeset); // will check on this one
664 FD_SET(kid[pn].w, &errset);
665 if (kid[pn].w >= n)
666 n = kid[pn].w + 1;
667 }
668 if (!n) // every child is idle?
669 return -1;
670 // wait on "busy" child(ren)
671 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
672 if (errno != EINTR) {
673 error(SYSTEM, "select call failed in GetChild()");
674 return -1;
675 }
676 pn = -1; // get flags set by select
677 for (n = nkids; n--; )
678 if (kidRow[n] >= 0 &&
679 FD_ISSET(kid[n].w, &writeset) |
680 FD_ISSET(kid[n].w, &errset)) {
681 // update output row counts
682 if (!FD_ISSET(kid[n].w, &errset))
683 UpdateRowsDone(kidRow[n]);
684 kidRow[n] = -1; // flag it available
685 pn = n;
686 }
687 return pn;
688 }
689
690 // Update row completion bitmap and update outputs (does not change rInPos)
691 bool
692 RcontribSimulManager::UpdateRowsDone(int r)
693 {
694 if (!rowsDone.TestAndSet(r)) {
695 error(WARNING, "redundant call to UpdateRowsDone()");
696 return false;
697 }
698 int nDone = GetRowFinished();
699 if (nDone <= r)
700 return true; // nothing to update, yet
701 for (RcontribOutput *op = outList; op; op = op->next)
702 if (!op->SetRowsDone(nDone))
703 return false;
704 return true; // up-to-date
705 }
706
707 // Run rcontrib child process (never returns)
708 void
709 RcontribSimulManager::RunChild()
710 {
711 FVECT * vecList = NULL;
712 RowAssignment rass;
713 ssize_t nr;
714
715 accum = 0;
716 errno = 0;
717 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
718 if (!rass.ac) {
719 error(CONSISTENCY, "bad accumulator count in child");
720 exit(1);
721 }
722 if (rass.ac > accum)
723 vecList = (FVECT *)erealloc(vecList,
724 sizeof(FVECT)*2*rass.ac);
725 accum = rass.ac;
726 rInPos = rass.row;
727
728 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
729 sizeof(FVECT)*2*accum)
730 break;
731
732 if (ComputeRecord(vecList) <= 0)
733 exit(1);
734 }
735 if (nr) {
736 error(SYSTEM, "read error in child process");
737 exit(1);
738 }
739 exit(0);
740 }
741
742 // Add child processes as indicated
743 bool
744 RcontribSimulManager::StartKids(int n2go)
745 {
746 if ((n2go <= 0) | (nkids + n2go <= 1))
747 return false;
748
749 if (!nkids)
750 cow_memshare(); // preload objects
751
752 n2go += nkids; // => desired brood size
753 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
754 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
755
756 fflush(stdout); // shouldn't use, anyway
757 while (nkids < n2go) {
758 kid[nkids] = sp_inactive;
759 kid[nkids].w = dup(1);
760 kid[nkids].flags |= PF_FILT_OUT;
761 int rv = open_process(&kid[nkids], NULL);
762 if (!rv) { // in child process?
763 while (nkids-- > 0)
764 close(kid[nkids].w);
765 free(kid); free(kidRow);
766 kid = NULL; kidRow = NULL;
767 RunChild(); // should never return
768 _exit(1);
769 }
770 if (rv < 0) {
771 error(SYSTEM, "cannot fork worker process");
772 return false;
773 }
774 kidRow[nkids++] = -1; // newborn is ready
775 }
776 return true;
777 }
778
779 // Reap the indicated number of children (all if 0)
780 int
781 RcontribSimulManager::StopKids(int n2end)
782 {
783 if (nkids <= 0)
784 return 0;
785 FlushQueue();
786 int status = 0;
787 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
788 status = close_processes(kid, nkids);
789 free(kid); free(kidRow);
790 kid = NULL; kidRow = NULL;
791 nkids = 0;
792 } else { // else shrink family
793 int st;
794 while (n2end-- > 0)
795 if ((st = close_process(&kid[--nkids])) > 0)
796 status = st;
797 // could call realloc(), but it hardly matters
798 }
799 if (status) {
800 sprintf(errmsg, "non-zero (%d) status from child", status);
801 error(WARNING, errmsg);
802 }
803 return status;
804 }
805
806 // Set number of computation threads (0 => #cores)
807 int
808 RcontribSimulManager::SetThreadCount(int nt)
809 {
810 if (!Ready()) {
811 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
812 return 0;
813 }
814 if (nt < 0)
815 return NThreads();
816 if (!nt) nt = GetNCores();
817 int status = 0;
818 if (nt == 1)
819 status = StopKids();
820 else if (nt < nkids)
821 status = StopKids(nkids-nt);
822 else if (nt > nkids)
823 StartKids(nt-nkids);
824 if (status) {
825 sprintf(errmsg, "non-zero (%d) status from child", status);
826 error(WARNING, errmsg);
827 }
828 return NThreads();
829 }