ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.6
Committed: Wed Nov 6 18:28:52 2024 UTC (5 months, 3 weeks ago) by greg
Branch: MAIN
Changes since 2.5: +3 -3 lines
Log Message:
perf(rxtrace,rxpict,rxpiece,rxcontrib): Improved exit strategy in children

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.5 2024/11/01 23:05:01 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 int contrib = 0; // computing contributions?
28
29 int xres = 0; // horizontal (scan) size
30 int yres = 0; // vertical resolution
31
32 // new/exclusive, overwrite if exists, or recover data
33 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
34 RDSread|RDSwrite};
35
36 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
37 #define LNROWSTR 6
38
39 // Modifier channel for recording contributions (no constructor/destructor)
40 struct RcontribMod {
41 RcontribOutput * opl; // pointer to first output channel
42 char * params; // parameters string
43 EPNODE * binv; // bin expression (NULL if 1 bin)
44 int nbins; // bin count this modifier
45 int coffset; // column offset in bytes
46 DCOLORV cbin[1]; // bin accumulator (extends struct)
47 /// Access specific (spectral) color bin
48 DCOLORV * operator[](int n) {
49 if ((n < 0) | (n >= nbins)) return NULL;
50 return cbin + n*NCSAMP;
51 }
52 };
53
54 // Struct used to assign record calculation to child
55 struct RowAssignment {
56 uint32 row; // row to do
57 uint32 ac; // accumulation count
58 };
59
60 // Our default data share function
61 RdataShare *
62 defDataShare(const char *name, RCOutputOp op, size_t siz)
63 {
64 return new RdataShareMap(name, RSDOflags[op], siz);
65 }
66
67 // Allocate rcontrib accumulator
68 RcontribMod *
69 NewRcMod(const char *prms, const char *binexpr, int ncbins)
70 {
71 if (ncbins <= 0) return NULL;
72 if (!prms) prms = "";
73 if (!binexpr & (ncbins > 1)) {
74 error(INTERNAL, "missing bin expression");
75 return NULL;
76 }
77 if (ncbins == 1) { // shouldn't have bin expression?
78 if (binexpr && strcmp(binexpr, "0"))
79 error(WARNING, "ignoring non-zero expression for single bin");
80 prms = "";
81 binexpr = NULL;
82 }
83 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
84 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
85 strlen(prms)+1);
86
87 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
88 if (binexpr) {
89 mp->binv = eparse(const_cast<char *>(binexpr));
90 CHECK(mp->binv->type==NUM, WARNING, "constant bin expression");
91 }
92 mp->nbins = ncbins;
93 return mp;
94 }
95
96 // Free an RcontribMod
97 void
98 FreeRcMod(void *p)
99 {
100 if (!p) return;
101 EPNODE * bep = (*(RcontribMod *)p).binv;
102 if (bep) epfree(bep, true);
103 efree(p);
104 }
105
106 // Set output format ('f', 'd', or 'c')
107 bool
108 RcontribSimulManager::SetDataFormat(int ty)
109 {
110 if (outList) {
111 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
112 return false;
113 }
114 switch (ty) {
115 case 'f':
116 dsiz = sizeof(float)*NCSAMP;
117 break;
118 case 'd':
119 dsiz = sizeof(double)*NCSAMP;
120 break;
121 case 'c':
122 dsiz = LSCOLR;
123 break;
124 default:
125 sprintf(errmsg, "unsupported output format '%c'", ty);
126 error(INTERNAL, errmsg);
127 return false;
128 }
129 dtyp = ty;
130 return true;
131 }
132
133 // static call-back for rcontrib ray-tracing
134 int
135 RcontribSimulManager::RctCall(RAY *r, void *cd)
136 {
137 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
138 return 0;
139 // shadow ray not on source?
140 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
141 return 0;
142
143 const char * mname = objptr(r->ro->omod)->oname;
144 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
145 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
146 if (!mp)
147 return 0; // not in our modifier list
148
149 int bi = 0; // get bin index
150 if (mp->binv) {
151 worldfunc(RCCONTEXT, r);
152 set_eparams(mp->params);
153 double bval = evalue(mp->binv);
154 if (bval <= -.5)
155 return 0; // silently ignore negative bin index
156 bi = int(bval + .5);
157 }
158 DCOLORV * dvp = (*mp)[bi];
159 if (!dvp) {
160 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
161 error(WARNING, errmsg);
162 return 0;
163 }
164 SCOLOR contr;
165 raycontrib(contr, r, PRIMARY); // compute coefficient
166 if (contrib)
167 smultscolor(contr, r->rcol); // -> value contribution
168 for (int i = 0; i < NCSAMP; i++)
169 *dvp++ += contr[i]; // accumulate color/spectrum
170 return 1;
171 }
172
173 // check for given format type in string, return last char position
174 static int
175 hasFormat(const char *tst, const char *match)
176 {
177 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
178 const char * s = tst;
179 while (*s) {
180 if (*s++ != '%')
181 continue;
182 while (!strchr(allPoss, *s))
183 s++;
184 if (strchr(match, *s))
185 break;
186 s++;
187 }
188 return (*s != '\0')*(s - tst);
189 }
190
191 // Add a modifier and arguments, opening output file(s)
192 bool
193 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
194 const char *prms, const char *binval, int bincnt)
195 {
196 if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
197 error(WARNING, "ignoring bad call to AddModifier()");
198 return false;
199 }
200 if (!nChan) { // initial call?
201 if (!SetDataFormat(dtyp))
202 return false;
203 nChan = NCSAMP;
204 } else if (nChan != NCSAMP) {
205 error(INTERNAL, "number of spectral channels must be fixed");
206 return false;
207 }
208 if (Ready()) {
209 error(INTERNAL, "call to AddModifier() after PrepOutput()");
210 return false;
211 }
212 LUENT * lp = lu_find(&modLUT, modn);
213 if (lp->data) {
214 sprintf(errmsg, "duplicate modifier '%s'", modn);
215 error(USER, errmsg);
216 return false;
217 }
218 if (!lp->key) // create new entry
219 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
220
221 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
222 if (!mp) return false;
223 lp->data = (char *)mp;
224 const int modndx = hasFormat(outspec, "sb");
225 const int binndx = hasFormat(outspec, "diouxX");
226 int bin0 = 0;
227 char fnbuf[512];
228 if (!modndx | (modndx > binndx))
229 sprintf(fnbuf, outspec, bin0, modn);
230 else
231 sprintf(fnbuf, outspec, modn, bin0);
232 RcontribOutput * olast = NULL;
233 RcontribOutput * op;
234 for (op = outList; op; op = (olast=op)->next) {
235 if (strcmp(op->GetName(), fnbuf))
236 continue;
237 if (modndx) { // this ain't right
238 sprintf(errmsg, "output name collision for '%s'", fnbuf);
239 error(USER, errmsg);
240 return false;
241 }
242 if ((binndx > 0) & (xres > 0)) {
243 sprintf(fnbuf, outspec, ++bin0);
244 continue; // each bin goes to another image
245 }
246 mp->coffset = op->rowBytes; // else add to what's there
247 break;
248 }
249 if (!op) { // create new output channel?
250 op = new RcontribOutput(fnbuf);
251 if (olast) olast->next = op;
252 else outList = op;
253 }
254 mp->opl = op; // first (maybe only) output channel
255 if (modndx) // remember modifier if part of name
256 op->omod = lp->key;
257 if (binndx > 0) { // append output image/bin list
258 op->rowBytes += dsiz;
259 op->obin = bin0;
260 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
261 if (!modndx | (modndx > binndx))
262 sprintf(fnbuf, outspec, bin0+bincnt, modn);
263 else
264 sprintf(fnbuf, outspec, modn, bin0+bincnt);
265 if (!op->next) {
266 olast = op;
267 olast->next = op = new RcontribOutput(fnbuf);
268 if (modndx) op->omod = lp->key;
269 op->obin = bin0+bincnt;
270 } else {
271 op = op->next;
272 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
273 "bin number mismatch in AddModifier()");
274 }
275 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
276 "row offset mismatch in AddModifier()");
277 op->rowBytes += dsiz;
278 }
279 } else // else send all results to this channel
280 op->rowBytes += bincnt*dsiz;
281 return true;
282 }
283
284 // Add a file of modifiers with associated arguments
285 bool
286 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
287 const char *prms,
288 const char *binval, int bincnt)
289 {
290 char * path = getpath(const_cast<char *>(modfn),
291 getrlibpath(), R_OK);
292 FILE * fp;
293 if (!path || !(fp = fopen(path, "r"))) {
294 sprintf(errmsg, "cannot %s modifier file '%s'",
295 path ? "open" : "find", modfn);
296 error(SYSTEM, errmsg);
297 return false;
298 }
299 char mod[MAXSTR];
300 while (fgetword(mod, sizeof(mod), fp))
301 if (!AddModifier(mod, outspec, prms, binval, bincnt))
302 return false;
303 fclose(fp);
304 return true;
305 }
306
307 // call-back to check if modifier has been loaded
308 static int
309 checkModExists(const LUENT *lp, void *p)
310 {
311 if (modifier(lp->key) != OVOID)
312 return 1;
313
314 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
315 error(WARNING, errmsg);
316 return 0;
317 }
318
319 // Prepare output channels and return # completed rows
320 int
321 RcontribSimulManager::PrepOutput()
322 {
323 if (!outList || !RtraceSimulManager::Ready()) {
324 error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
325 return -1;
326 }
327 if (!cdsF) {
328 error(INTERNAL, "missing RdataShare constructor call (*cdsF)");
329 return -1;
330 }
331 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
332 return -1;
333
334 int remWarnings = 20;
335 for (RcontribOutput *op = outList; op; op = op->next) {
336 if (op->rData) {
337 error(INTERNAL, "output channel already open in PrepOutput()");
338 return -1;
339 }
340 op->nRows = yres * (xres + !xres);
341 op->rData = (*cdsF)(op->ofname, outOp,
342 GetHeadLen()+1024 + op->nRows*op->rowBytes);
343 freeqstr(op->ofname); op->ofname = NULL;
344 if (outOp == RCOrecover) {
345 int rd = op->CheckHeader(this);
346 if (rd < 0)
347 return -1;
348 if (rd >= op->nRows) {
349 if (remWarnings >= 0) {
350 sprintf(errmsg, "recovered output '%s' already done",
351 op->GetName());
352 error(WARNING, remWarnings ? errmsg : "etc...");
353 remWarnings--;
354 }
355 rd = op->nRows;
356 }
357 if (!rInPos | (rInPos > rd)) rInPos = rd;
358 } else if (!op->NewHeader(this))
359 return -1;
360 // make sure there's room
361 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
362 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
363 return -1; // calls error() for us
364 }
365 rowsDone.NewBitMap(outList->nRows); // create row completion map
366 rowsDone.ClearBits(0, rInPos, true);
367 return rInPos;
368 }
369
370 // Create header in open write-only channel
371 bool
372 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
373 {
374 int headlim = rcp->GetHeadLen() + 1024;
375 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
376 int esiz;
377 const int etyp = rcp->GetFormat(&esiz);
378
379 strcpy(hdr, HDRSTR);
380 strcat(hdr, "RADIANCE\n");
381 begData = strlen(hdr);
382 strcpy(hdr+begData, rcp->GetHeadStr());
383 begData += rcp->GetHeadLen();
384 if (omod) {
385 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
386 begData += strlen(hdr+begData);
387 }
388 if (obin >= 0) {
389 sprintf(hdr+begData, "BIN=%d\n", obin);
390 begData += strlen(hdr+begData);
391 }
392 strcpy(hdr+begData, ROWZEROSTR);
393 rowCountPos = begData+LNROWSTR;
394 begData += sizeof(ROWZEROSTR)-1;
395 if (!xres | (rowBytes > esiz)) {
396 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
397 begData += strlen(hdr+begData);
398 }
399 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
400 begData += strlen(hdr+begData);
401 if (NCSAMP > 3) {
402 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
403 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
404 begData += strlen(hdr+begData);
405 }
406 if (etyp != 'c') {
407 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
408 begData += strlen(hdr+begData);
409 }
410 int align = 0;
411 switch (etyp) {
412 case 'f':
413 align = sizeof(float);
414 break;
415 case 'd':
416 align = sizeof(double);
417 break;
418 case 'c':
419 break;
420 default:
421 error(INTERNAL, "unsupported data type in NewHeader()");
422 return false;
423 }
424 strcpy(hdr+begData, FMTSTR); // write format string
425 begData += strlen(hdr+begData);
426 strcpy(hdr+begData, formstr(etyp));
427 begData += strlen(hdr+begData);
428 if (align) // align data at end of header
429 while ((begData+2) % align)
430 hdr[begData++] = ' ';
431 hdr[begData++] = '\n'; // EOL for data format
432 hdr[begData++] = '\n'; // end of nominal header
433 if ((xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
434 sprintf(hdr+begData, PIXSTDFMT, yres, xres);
435 begData += strlen(hdr+begData);
436 }
437 return rData->ReleaseMemory(hdr, RDSwrite);
438 }
439
440 // find string argument in header for named variable
441 static const char *
442 findArgs(const char *hdr, const char *vnm, int len)
443 {
444 const char * npos = strnstr(hdr, vnm, len);
445
446 if (!npos) return NULL;
447
448 npos += strlen(vnm); // find start of (first) argument
449 len -= npos - hdr;
450 while (len-- > 0 && (*npos == '=') | isspace(*npos))
451 if (*npos++ == '\n')
452 return NULL;
453
454 return (len >= 0) ? npos : NULL;
455 }
456
457 // Load and check header in read/write channel
458 int
459 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
460 {
461 int esiz;
462 const int etyp = rcp->GetFormat(&esiz);
463 const int maxlen = rcp->GetHeadLen() + 1024;
464 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
465 const char * cp;
466 // find end of header
467 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
468 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
469 error(USER, errmsg);
470 return -1;
471 }
472 begData = cp - hdr + 1; // increment again at end
473 // check # components
474 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
475 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
476 error(USER, errmsg);
477 return -1;
478 }
479 // check format
480 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
481 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
482 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
483 error(USER, errmsg);
484 return -1;
485 }
486 // check #columns
487 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
488 !xres | (rowBytes > esiz)) {
489 sprintf(errmsg, "expected NCOLS=%d in '%s'",
490 int(rowBytes/esiz), GetName());
491 error(USER, errmsg);
492 return -1;
493 }
494 // find row count
495 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
496 sprintf(errmsg, "missing NROWS in '%s'", GetName());
497 error(USER, errmsg);
498 return -1;
499 }
500 rowCountPos = cp - hdr;
501 int rlast = atoi(cp);
502 begData++; // advance past closing EOL
503 if ((xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
504 char rbuf[64];
505 sprintf(rbuf, PIXSTDFMT, yres, xres);
506 int rlen = strlen(rbuf);
507 if (strncmp(rbuf, hdr+begData, rlen)) {
508 sprintf(errmsg, "bad resolution string in '%s'", GetName());
509 error(USER, errmsg);
510 return -1;
511 }
512 begData += rlen;
513 }
514 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
515 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
516 }
517
518 // Rewind calculation (previous results unchanged)
519 bool
520 RcontribSimulManager::ResetRow(int r)
521 {
522 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
523 error(WARNING, "ignoring bad call to ResetRow()");
524 return (r == rInPos);
525 }
526 FlushQueue(); // finish current and reset
527 for (RcontribOutput *op = outList; op; op = op->next)
528 if (!op->SetRowsDone(r))
529 return false;
530
531 rowsDone.ClearBits(r, rInPos-r, false);
532 rInPos = r;
533 return true;
534 }
535
536 // call-back for averaging each modifier's contributions to assigned channel(s)
537 static int
538 putModContrib(const LUENT *lp, void *p)
539 {
540 RcontribMod * mp = (RcontribMod *)lp->data;
541 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
542 const double sca = 1./rcp->accum;
543 const DCOLORV * dvp = mp->cbin;
544 RcontribOutput * op = mp->opl;
545 int i, n;
546
547 switch (rcp->GetFormat()) { // conversion based on output type
548 case 'd': {
549 double * dvo = (double *)op->InsertionP(mp->coffset);
550 for (n = mp->nbins; n--; ) {
551 for (i = 0; i < NCSAMP; i++)
552 *dvo++ = *dvp++ * sca;
553 if ((op->obin >= 0) & (n > 0))
554 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
555 }
556 } break;
557 case 'f': {
558 float * fvo = (float *)op->InsertionP(mp->coffset);
559 for (n = mp->nbins; n--; ) {
560 for (i = 0; i < NCSAMP; i++)
561 *fvo++ = float(*dvp++ * sca);
562 if ((op->obin >= 0) & (n > 0))
563 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
564 }
565 } break;
566 case 'c': {
567 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
568 for (n = mp->nbins; n--; ) {
569 SCOLOR scol;
570 for (i = 0; i < NCSAMP; i++)
571 scol[i] = COLORV(*dvp++ * sca);
572 scolor_scolr(cvo, scol);
573 cvo += LSCOLR;
574 if ((op->obin >= 0) & (n > 0))
575 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
576 }
577 } break;
578 default:
579 error(CONSISTENCY, "unsupported output type in sendModContrib()");
580 return -1;
581 }
582 // clear for next tally
583 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
584 return 1;
585 }
586
587 // Add a ray/bundle to compute next record
588 int
589 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
590 {
591 if (!Ready())
592 return 0;
593 if (rInPos >= outList->nRows) {
594 error(WARNING, "ComputeRecord() called after last record");
595 return 0;
596 }
597 if (nkids > 0) { // in parent process?
598 int k = GetChild(); // updates output rows
599 if (k < 0) return -1; // can't really happen
600 RowAssignment rass;
601 rass.row = kidRow[k] = rInPos++;
602 rass.ac = accum;
603 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
604 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
605 != sizeof(FVECT)*2*accum) {
606 error(SYSTEM, "cannot write to child; dead process?");
607 return -1;
608 }
609 return accum; // tracing/output happens in child
610 }
611 if (NCSAMP != nChan) {
612 error(INTERNAL, "number of color channels must remain fixed");
613 return -1;
614 }
615 // actual work is done here...
616 if (EnqueueBundle(orig_direc, accum) < 0)
617 return -1;
618
619 RcontribOutput * op; // prepare output buffers
620 for (op = outList; op; op = op->next)
621 if (!op->GetRow(rInPos))
622 return -1;
623 // convert averages & clear
624 if (lu_doall(&modLUT, putModContrib, this) < 0)
625 return -1;
626 // write buffers
627 for (op = outList; op; op = op->next)
628 op->DoneRow();
629
630 if (!nkids) // update row if solo process
631 UpdateRowsDone(rInPos++); // XXX increment here is critical
632
633 return accum;
634 }
635
636 // Get next available child, returning index or -1 if forceWait & idling
637 int
638 RcontribSimulManager::GetChild(bool forceWait)
639 {
640 if (nkids <= 0)
641 return -1;
642 // take inventory
643 int pn, n = 0;
644 fd_set writeset, errset;
645 FD_ZERO(&writeset); FD_ZERO(&errset);
646 for (pn = nkids; pn--; ) {
647 if (kidRow[pn] < 0) { // child already ready?
648 if (forceWait) continue;
649 return pn; // good enough
650 }
651 FD_SET(kid[pn].w, &writeset); // will check on this one
652 FD_SET(kid[pn].w, &errset);
653 if (kid[pn].w >= n)
654 n = kid[pn].w + 1;
655 }
656 if (!n) // every child is idle?
657 return -1;
658 // wait on "busy" child(ren)
659 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
660 if (errno != EINTR) {
661 error(SYSTEM, "select call failed in GetChild()");
662 return -1;
663 }
664 pn = -1; // get flags set by select
665 for (n = nkids; n--; )
666 if (kidRow[n] >= 0 &&
667 FD_ISSET(kid[n].w, &writeset) |
668 FD_ISSET(kid[n].w, &errset)) {
669 // update output row counts
670 if (!FD_ISSET(kid[n].w, &errset))
671 UpdateRowsDone(kidRow[n]);
672 kidRow[n] = -1; // flag it available
673 pn = n;
674 }
675 return pn;
676 }
677
678 // Update row completion bitmap and update outputs (does not change rInPos)
679 bool
680 RcontribSimulManager::UpdateRowsDone(int r)
681 {
682 if (!rowsDone.TestAndSet(r)) {
683 error(WARNING, "redundant call to UpdateRowsDone()");
684 return false;
685 }
686 int nDone = GetRowFinished();
687 if (nDone <= r)
688 return true; // nothing to update, yet
689 for (RcontribOutput *op = outList; op; op = op->next)
690 if (!op->SetRowsDone(nDone))
691 return false;
692 return true; // up-to-date
693 }
694
695 // Run rcontrib child process (never returns)
696 void
697 RcontribSimulManager::RunChild()
698 {
699 FVECT * vecList = NULL;
700 RowAssignment rass;
701 ssize_t nr;
702
703 accum = 0;
704 errno = 0;
705 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
706 if (!rass.ac) {
707 error(CONSISTENCY, "bad accumulator count in child");
708 exit(1);
709 }
710 if (rass.ac > accum)
711 vecList = (FVECT *)erealloc(vecList,
712 sizeof(FVECT)*2*rass.ac);
713 accum = rass.ac;
714 rInPos = rass.row;
715
716 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
717 sizeof(FVECT)*2*accum)
718 break;
719
720 if (ComputeRecord(vecList) <= 0)
721 exit(1);
722 }
723 if (nr) {
724 error(SYSTEM, "read error in child process");
725 exit(1);
726 }
727 exit(0);
728 }
729
730 // Add child processes as indicated
731 bool
732 RcontribSimulManager::StartKids(int n2go)
733 {
734 if ((n2go <= 0) | (nkids + n2go <= 1))
735 return false;
736
737 if (!nkids)
738 cow_memshare(); // preload objects
739
740 n2go += nkids; // => desired brood size
741 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
742 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
743
744 fflush(stdout); // shouldn't use, anyway
745 while (nkids < n2go) {
746 kid[nkids] = sp_inactive;
747 kid[nkids].w = dup(1);
748 kid[nkids].flags |= PF_FILT_OUT;
749 int rv = open_process(&kid[nkids], NULL);
750 if (!rv) { // in child process?
751 while (nkids-- > 0)
752 close(kid[nkids].w);
753 free(kid); free(kidRow);
754 kid = NULL; kidRow = NULL;
755 RunChild(); // should never return
756 _exit(1);
757 }
758 if (rv < 0) {
759 error(SYSTEM, "cannot fork worker process");
760 return false;
761 }
762 kidRow[nkids++] = -1; // newborn is ready
763 }
764 return true;
765 }
766
767 // Reap the indicated number of children (all if 0)
768 int
769 RcontribSimulManager::StopKids(int n2end)
770 {
771 if (nkids <= 0)
772 return 0;
773 FlushQueue();
774 int status = 0;
775 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
776 status = close_processes(kid, nkids);
777 free(kid); free(kidRow);
778 kid = NULL; kidRow = NULL;
779 nkids = 0;
780 } else { // else shrink family
781 int st;
782 while (n2end-- > 0)
783 if ((st = close_process(&kid[--nkids])) > 0)
784 status = st;
785 // could call realloc(), but it hardly matters
786 }
787 if (status) {
788 sprintf(errmsg, "non-zero (%d) status from child", status);
789 error(WARNING, errmsg);
790 }
791 return status;
792 }
793
794 // Set number of computation threads (0 => #cores)
795 int
796 RcontribSimulManager::SetThreadCount(int nt)
797 {
798 if (!Ready()) {
799 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
800 return 0;
801 }
802 if (nt < 0)
803 return NThreads();
804 if (!nt) nt = GetNCores();
805 int status = 0;
806 if (nt == 1)
807 status = StopKids();
808 else if (nt < nkids)
809 status = StopKids(nkids-nt);
810 else if (nt > nkids)
811 StartKids(nt-nkids);
812 if (status) {
813 sprintf(errmsg, "non-zero (%d) status from child", status);
814 error(WARNING, errmsg);
815 }
816 return NThreads();
817 }