ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/src/rt/RcontribSimulManager.cpp
Revision: 2.14
Committed: Fri Oct 17 01:15:53 2025 UTC (2 days, 1 hour ago) by greg
Branch: MAIN
Changes since 2.13: +28 -18 lines
Log Message:
perf(rxcontrib): Added handshake to ensure each row is complete in children

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.13 2025/10/16 23:14:31 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 static const char ROW_DONE[] = "ROW FINISHED\n";
56
57 // Get format identifier
58 const char *
59 formstr(int f)
60 {
61 switch (f) {
62 case 'a': return("ascii");
63 case 'f': return("float");
64 case 'd': return("double");
65 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
66 }
67 return("unknown");
68 }
69
70 // Our default data share function
71 RdataShare *
72 defDataShare(const char *name, RCOutputOp op, size_t siz)
73 {
74 return new RdataShareMap(name, RSDOflags[op], siz);
75 }
76
77 // Allocate rcontrib accumulator
78 RcontribMod *
79 NewRcMod(const char *prms, const char *binexpr, int ncbins)
80 {
81 if (!prms) prms = "";
82 if ((ncbins > 1) & !binexpr) {
83 error(USER, "missing bin expression");
84 return NULL;
85 }
86 if (ncbins < 1) ncbins = 1;
87
88 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
89 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
90 strlen(prms)+1);
91
92 if (binexpr) { // get/check bin expression
93 mp->binv = eparse(const_cast<char *>(binexpr));
94 if (mp->binv->type == NUM) { // constant expression (0)?
95 if ((int)evalue(mp->binv) != 0) {
96 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
97 binexpr);
98 error(USER, errmsg);
99 }
100 if (ncbins > 1) {
101 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
102 error(USER, errmsg);
103 }
104 epfree(mp->binv, true);
105 mp->binv = NULL;
106 prms = "";
107 ncbins = 1;
108 }
109 }
110 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
111 mp->nbins = ncbins;
112 return mp;
113 }
114
115 // Free an RcontribMod
116 void
117 FreeRcMod(void *p)
118 {
119 if (!p) return;
120 EPNODE * bep = (*(RcontribMod *)p).binv;
121 if (bep) epfree(bep, true);
122 efree(p);
123 }
124
125 // Set output format ('f', 'd', or 'c')
126 bool
127 RcontribSimulManager::SetDataFormat(int ty)
128 {
129 if (outList) {
130 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
131 return false;
132 }
133 switch (ty) {
134 case 'f':
135 dsiz = sizeof(float)*NCSAMP;
136 break;
137 case 'd':
138 dsiz = sizeof(double)*NCSAMP;
139 break;
140 case 'c':
141 dsiz = LSCOLR;
142 break;
143 default:
144 sprintf(errmsg, "unsupported output format '%c'", ty);
145 error(INTERNAL, errmsg);
146 return false;
147 }
148 dtyp = ty;
149 return true;
150 }
151
152 // static call-back for rcontrib ray-tracing
153 int
154 RcontribSimulManager::RctCall(RAY *r, void *cd)
155 {
156 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
157 return 0;
158 // shadow ray not on source?
159 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
160 return 0;
161
162 const char * mname = objptr(r->ro->omod)->oname;
163 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
164 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
165 if (!mp)
166 return 0; // not in our modifier list
167
168 int bi = 0; // get bin index
169 if (mp->binv) {
170 worldfunc(RCCONTEXT, r);
171 set_eparams(mp->params);
172 double bval = evalue(mp->binv);
173 if (bval <= -.5)
174 return 0; // silently ignore negative bin index
175 bi = int(bval + .5);
176 }
177 DCOLORV * dvp = (*mp)[bi];
178 if (!dvp) {
179 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
180 error(WARNING, errmsg);
181 return 0;
182 }
183 SCOLOR contr;
184 raycontrib(contr, r, PRIMARY); // compute coefficient
185 if (rcp->HasFlag(RCcontrib))
186 smultscolor(contr, r->rcol); // -> value contribution
187
188 for (int i = 0; i < NCSAMP; i++)
189 *dvp++ += contr[i]; // accumulate color/spectrum
190 return 1;
191 }
192
193 // check for given format type in string, return last char position
194 static int
195 hasFormat(const char *tst, const char *match)
196 {
197 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
198 const char * s = tst;
199 while (*s) {
200 if (*s++ != '%')
201 continue;
202 while (!strchr(allPoss, *s))
203 s++;
204 if (strchr(match, *s))
205 break;
206 s++;
207 }
208 return (*s != '\0')*(s - tst);
209 }
210
211 // Add a modifier and arguments, opening output file(s)
212 bool
213 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
214 const char *prms, const char *binval, int bincnt)
215 {
216 if (!modn | !outspec || !*modn | !*outspec) {
217 error(WARNING, "ignoring bad call to AddModifier()");
218 return false;
219 }
220 if (*outspec == '!') {
221 error(USER, "command output not supported by RcontribSimulManager");
222 return false;
223 }
224 if (!nChan) { // initial call?
225 if ((xres < 0) | (yres <= 0)) {
226 error(USER, "xres, yres must be set before first modifier");
227 return false;
228 }
229 if (!SetDataFormat(dtyp))
230 return false;
231 nChan = NCSAMP;
232 } else if (nChan != NCSAMP) {
233 error(USER, "# spectral channels must be fixed in AddModifier()");
234 return false;
235 }
236 if (Ready()) {
237 error(INTERNAL, "call to AddModifier() after PrepOutput()");
238 return false;
239 }
240 LUENT * lp = lu_find(&modLUT, modn);
241 if (lp->data) {
242 sprintf(errmsg, "duplicate modifier '%s'", modn);
243 error(USER, errmsg);
244 return false;
245 }
246 if (!lp->key) // create new entry
247 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
248
249 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
250 if (!mp) return false;
251 lp->data = (char *)mp;
252 const int modndx = hasFormat(outspec, "sb");
253 const int binndx = hasFormat(outspec, "diouxX");
254 int bin0 = 0;
255 char fnbuf[512];
256 if (!modndx || (binndx > 0) & (modndx > binndx))
257 sprintf(fnbuf, outspec, bin0, modn);
258 else
259 sprintf(fnbuf, outspec, modn, bin0);
260 RcontribOutput * olast = NULL;
261 RcontribOutput * op;
262 for (op = outList; op; op = (olast=op)->next) {
263 if (strcmp(op->GetName(), fnbuf))
264 continue;
265 if (modndx) { // this ain't right
266 sprintf(errmsg, "output name collision for '%s'", fnbuf);
267 error(USER, errmsg);
268 return false;
269 }
270 if ((binndx > 0) & (xres > 0)) {
271 sprintf(fnbuf, outspec, ++bin0);
272 continue; // each bin goes to another image
273 }
274 mp->coffset = op->rowBytes; // else add to what's there
275 break;
276 }
277 if (!op) { // create new output channel?
278 op = new RcontribOutput(fnbuf);
279 if (olast) olast->next = op;
280 else outList = op;
281 }
282 mp->opl = op; // first (maybe only) output channel
283 if (modndx) // remember modifier if part of name
284 op->omod = lp->key;
285 if (binndx) { // append output image/bin list
286 op->rowBytes += dsiz;
287 op->obin = bin0;
288 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
289 if (!modndx || (binndx > 0) & (modndx > binndx))
290 sprintf(fnbuf, outspec, bin0+bincnt, modn);
291 else
292 sprintf(fnbuf, outspec, modn, bin0+bincnt);
293 if (!op->next) {
294 olast = op;
295 olast->next = op = new RcontribOutput(fnbuf);
296 if (modndx) op->omod = lp->key;
297 op->obin = bin0+bincnt;
298 } else {
299 op = op->next;
300 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
301 "bin number mismatch in AddModifier()");
302 }
303 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
304 "row offset mismatch in AddModifier()");
305 op->rowBytes += dsiz;
306 }
307 } else // else send all results to this channel
308 op->rowBytes += mp->nbins*dsiz;
309 return true;
310 }
311
312 // Add a file of modifiers with associated arguments
313 bool
314 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
315 const char *prms,
316 const char *binval, int bincnt)
317 {
318 char * path = getpath(const_cast<char *>(modfn),
319 getrlibpath(), R_OK);
320 FILE * fp;
321 if (!path || !(fp = fopen(path, "r"))) {
322 sprintf(errmsg, "cannot %s modifier file '%s'",
323 path ? "open" : "find", modfn);
324 error(SYSTEM, errmsg);
325 return false;
326 }
327 char mod[MAXSTR];
328 while (fgetword(mod, sizeof(mod), fp))
329 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
330 fclose(fp);
331 return false;
332 }
333 fclose(fp);
334 return true;
335 }
336
337 // call-back to check if modifier has been loaded
338 static int
339 checkModExists(const LUENT *lp, void *p)
340 {
341 OBJECT mod = modifier(lp->key);
342
343 if ((mod != OVOID) & (mod < nsceneobjs))
344 return 1;
345
346 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
347 error(WARNING, errmsg);
348 return 0;
349 }
350
351 // Prepare output channels and return # completed rows
352 int
353 RcontribSimulManager::PrepOutput()
354 {
355 if (!outList || !RtraceSimulManager::Ready()) {
356 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
357 return -1;
358 }
359 if (!cdsF) {
360 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
361 return -1;
362 }
363 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
364 return -1;
365
366 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
367 int remWarnings = 20;
368 for (RcontribOutput *op = outList; op; op = op->next) {
369 if (op->rData) {
370 error(INTERNAL, "output channel already open in PrepOutput()");
371 return -1;
372 }
373 op->nRows = outList->nRows;
374 op->rData = (*cdsF)(op->ofname, outOp,
375 GetHeadLen()+1024 + op->nRows*op->rowBytes);
376 freeqstr(op->ofname); op->ofname = NULL;
377 if (outOp == RCOrecover) {
378 int rd = op->CheckHeader(this);
379 if (rd < 0)
380 return -1;
381 if (rd >= op->nRows) {
382 if (remWarnings >= 0) {
383 sprintf(errmsg, "recovered output '%s' is complete",
384 op->GetName());
385 error(WARNING, --remWarnings ? errmsg : "etc...");
386 }
387 rd = op->nRows;
388 }
389 if (!rInPos | (rInPos > rd)) rInPos = rd;
390 } else if (!op->NewHeader(this))
391 return -1;
392 // make sure there's room
393 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
394 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
395 return -1; // calls error() for us
396 }
397 rowsDone.NewBitMap(outList->nRows); // create row completion map
398 rowsDone.ClearBits(0, rInPos, true);
399 return rInPos;
400 }
401
402 // Create header in open write-only channel
403 bool
404 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
405 {
406 int headlim = rcp->GetHeadLen() + 1024;
407 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
408 int esiz;
409 const int etyp = rcp->GetFormat(&esiz);
410
411 strcpy(hdr, HDRSTR);
412 strcat(hdr, "RADIANCE\n");
413 begData = strlen(hdr);
414 strcpy(hdr+begData, rcp->GetHeadStr());
415 begData += rcp->GetHeadLen();
416 if (omod) {
417 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
418 begData += strlen(hdr+begData);
419 }
420 if (obin >= 0) {
421 sprintf(hdr+begData, "BIN=%d\n", obin);
422 begData += strlen(hdr+begData);
423 }
424 strcpy(hdr+begData, ROWZEROSTR);
425 rowCountPos = begData+LNROWSTR;
426 begData += sizeof(ROWZEROSTR)-1;
427 if (!rcp->xres | (rowBytes > esiz)) {
428 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
429 begData += strlen(hdr+begData);
430 }
431 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
432 begData += strlen(hdr+begData);
433 if (NCSAMP > 3) {
434 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
435 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
436 begData += strlen(hdr+begData);
437 }
438 if (etyp != 'c') {
439 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
440 begData += strlen(hdr+begData);
441 }
442 int align = 0;
443 switch (etyp) {
444 case 'f':
445 align = sizeof(float);
446 break;
447 case 'd':
448 align = sizeof(double);
449 break;
450 case 'c':
451 break;
452 default:
453 error(INTERNAL, "unsupported data type in NewHeader()");
454 return false;
455 }
456 strcpy(hdr+begData, FMTSTR); // write format string
457 begData += strlen(hdr+begData);
458 strcpy(hdr+begData, formstr(etyp));
459 begData += strlen(hdr+begData);
460 if (align) // align data at end of header
461 while ((begData+2) % align)
462 hdr[begData++] = ' ';
463 hdr[begData++] = '\n'; // EOL for data format
464 hdr[begData++] = '\n'; // end of nominal header
465 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
466 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
467 begData += strlen(hdr+begData);
468 }
469 return rData->ReleaseMemory(hdr, RDSwrite);
470 }
471
472 // find string argument in header for named variable
473 static const char *
474 findArgs(const char *hdr, const char *vnm, int len)
475 {
476 const char * npos = strnstr(hdr, vnm, len);
477
478 if (!npos) return NULL;
479
480 npos += strlen(vnm); // find start of (first) argument
481 len -= npos - hdr;
482 while (len-- > 0 && (*npos == '=') | isspace(*npos))
483 if (*npos++ == '\n')
484 return NULL;
485
486 return (len >= 0) ? npos : NULL;
487 }
488
489 // Load and check header in read/write channel
490 int
491 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
492 {
493 int esiz;
494 const int etyp = rcp->GetFormat(&esiz);
495 const int maxlen = rcp->GetHeadLen() + 1024;
496 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
497 const char * cp;
498 // find end of header
499 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
500 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
501 error(USER, errmsg);
502 return -1;
503 }
504 begData = cp - hdr + 1; // increment again at end
505 // check # components
506 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
507 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
508 error(USER, errmsg);
509 return -1;
510 }
511 // check format
512 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
513 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
514 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
515 error(USER, errmsg);
516 return -1;
517 }
518 // check #columns
519 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
520 !rcp->xres | (rowBytes > esiz)) {
521 sprintf(errmsg, "expected NCOLS=%d in '%s'",
522 int(rowBytes/esiz), GetName());
523 error(USER, errmsg);
524 return -1;
525 }
526 // find row count
527 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
528 sprintf(errmsg, "missing NROWS in '%s'", GetName());
529 error(USER, errmsg);
530 return -1;
531 }
532 rowCountPos = cp - hdr;
533 int rlast = atoi(cp);
534 begData++; // advance past closing EOL
535 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
536 char rbuf[64];
537 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
538 int rlen = strlen(rbuf);
539 if (strncmp(rbuf, hdr+begData, rlen)) {
540 sprintf(errmsg, "bad resolution string in '%s'", GetName());
541 error(USER, errmsg);
542 return -1;
543 }
544 begData += rlen;
545 }
546 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
547 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
548 }
549
550 // Rewind calculation (previous results unchanged)
551 bool
552 RcontribSimulManager::ResetRow(int r)
553 {
554 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
555 error(WARNING, "ignoring bad call to ResetRow()");
556 return (r == rInPos);
557 }
558 FlushQueue(); // finish current and reset
559 for (RcontribOutput *op = outList; op; op = op->next)
560 if (!op->SetRowsDone(r))
561 return false;
562
563 rowsDone.ClearBits(r, rInPos-r, false);
564 rInPos = r;
565 return true;
566 }
567
568 // call-back for averaging each modifier's contributions to assigned channel(s)
569 static int
570 putModContrib(const LUENT *lp, void *p)
571 {
572 RcontribMod * mp = (RcontribMod *)lp->data;
573 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
574 const double sca = 1./rcp->accum;
575 const DCOLORV * dvp = mp->cbin;
576 RcontribOutput * op = mp->opl;
577 int i, n;
578
579 switch (rcp->GetFormat()) { // conversion based on output type
580 case 'd': {
581 double * dvo = (double *)op->InsertionP(mp->coffset);
582 for (n = mp->nbins; n--; ) {
583 for (i = 0; i < NCSAMP; i++)
584 *dvo++ = *dvp++ * sca;
585 if ((op->obin >= 0) & (n > 0))
586 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
587 }
588 } break;
589 case 'f': {
590 float * fvo = (float *)op->InsertionP(mp->coffset);
591 for (n = mp->nbins; n--; ) {
592 for (i = 0; i < NCSAMP; i++)
593 *fvo++ = float(*dvp++ * sca);
594 if ((op->obin >= 0) & (n > 0))
595 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
596 }
597 } break;
598 case 'c': {
599 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
600 for (n = mp->nbins; n--; ) {
601 SCOLOR scol;
602 for (i = 0; i < NCSAMP; i++)
603 scol[i] = COLORV(*dvp++ * sca);
604 scolor_scolr(cvo, scol);
605 cvo += LSCOLR;
606 if ((op->obin >= 0) & (n > 0))
607 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
608 }
609 } break;
610 default:
611 error(CONSISTENCY, "unsupported output type in putModContrib()");
612 return -1;
613 }
614 // clear for next tally
615 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
616 return 1;
617 }
618
619 // Add a ray/bundle to compute next record
620 int
621 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
622 {
623 if (!Ready())
624 return 0;
625 if (rInPos >= outList->nRows) {
626 error(WARNING, "ComputeRecord() called after last record");
627 return 0;
628 }
629 if (nkids > 0) { // in parent process?
630 int k = GetChild(false); // updates output rows
631 if (k < 0) return -1; // someone died?
632 RowAssignment rass;
633 rass.row = kidRow[k] = rInPos++;
634 rass.ac = accum;
635 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
636 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
637 != sizeof(FVECT)*2*accum) {
638 error(SYSTEM, "cannot write to child; dead process?");
639 return -1;
640 }
641 return accum; // tracing/output happens in child
642 }
643 if (NCSAMP != nChan) {
644 error(INTERNAL, "number of color channels must remain fixed");
645 return -1;
646 }
647 // actual work is done here...
648 if (EnqueueBundle(orig_direc, accum) < 0)
649 return -1;
650
651 RcontribOutput * op; // prepare output buffers
652 for (op = outList; op; op = op->next)
653 if (!op->GetRow(rInPos))
654 return -1;
655 // convert averages & clear
656 if (lu_doall(&modLUT, putModContrib, this) < 0)
657 return -1;
658 // write buffers
659 for (op = outList; op; op = op->next)
660 op->DoneRow();
661
662 if (!nkids) // update row if solo process
663 UpdateRowsDone(rInPos++); // XXX increment here is critical
664
665 return accum;
666 }
667
668 // Get next available child, returning index or -1 if forceWait & idling
669 int
670 RcontribSimulManager::GetChild(bool forceWait)
671 {
672 if (nkids <= 0)
673 return -1;
674 // take inventory
675 int pn, n = 0;
676 fd_set readset, errset;
677 FD_ZERO(&readset); FD_ZERO(&errset);
678 for (pn = nkids; pn--; ) {
679 if (kidRow[pn] < 0) { // child already ready?
680 if (forceWait) continue;
681 return pn; // good enough
682 }
683 FD_SET(kid[pn].r, &readset); // will check on this one
684 FD_SET(kid[pn].r, &errset);
685 if (kid[pn].r >= n)
686 n = kid[pn].r + 1;
687 }
688 if (!n) // every child is idle?
689 return -1;
690 // wait on "busy" child(ren)
691 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
692 if (errno != EINTR) {
693 error(SYSTEM, "select call failed in GetChild()");
694 return -1;
695 }
696 char buf[sizeof(ROW_DONE)] = "X";
697 pn = -1; // get flags set by select
698 for (n = nkids; n--; )
699 if (kidRow[n] >= 0 &&
700 FD_ISSET(kid[n].r, &readset) |
701 FD_ISSET(kid[n].r, &errset)) {
702 // check for error
703 if (FD_ISSET(kid[n].r, &errset) ||
704 read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
705 memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
706 return -1;
707 // update output row counts
708 UpdateRowsDone(kidRow[n]);
709 kidRow[n] = -1; // flag child available
710 pn = n;
711 }
712 return pn;
713 }
714
715 // Update row completion bitmap and update outputs (does not change rInPos)
716 bool
717 RcontribSimulManager::UpdateRowsDone(int r)
718 {
719 if (!rowsDone.TestAndSet(r)) {
720 error(WARNING, "redundant call to UpdateRowsDone()");
721 return false;
722 }
723 int nDone = GetRowFinished();
724 if (nDone <= r)
725 return true; // nothing to update, yet
726 for (RcontribOutput *op = outList; op; op = op->next)
727 if (!op->SetRowsDone(nDone))
728 return false;
729 return true; // up-to-date
730 }
731
732 // Run rcontrib child process (never returns)
733 void
734 RcontribSimulManager::RunChild()
735 {
736 FVECT * vecList = NULL;
737 RowAssignment rass;
738 ssize_t nr;
739
740 accum = 0;
741 errno = 0;
742 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
743 if (!rass.ac) {
744 error(CONSISTENCY, "bad accumulator count in child");
745 exit(1);
746 }
747 if (rass.ac > accum) {
748 efree(vecList);
749 vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
750 }
751 accum = rass.ac;
752 rInPos = rass.row;
753
754 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
755 sizeof(FVECT)*2*accum)
756 break;
757
758 if (ComputeRecord(vecList) <= 0)
759 exit(1);
760 // signal this row is done
761 if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
762 exit(1);
763 }
764 if (nr) {
765 error(SYSTEM, "read error in child process");
766 exit(1);
767 }
768 exit(0);
769 }
770
771 // Add child processes as indicated
772 bool
773 RcontribSimulManager::StartKids(int n2go)
774 {
775 if ((n2go <= 0) | (nkids + n2go <= 1))
776 return false;
777
778 if (!nkids)
779 cow_memshare(); // preload objects
780
781 n2go += nkids; // => desired brood size
782 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
783 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
784
785 fflush(stdout); // shouldn't use, anyway
786 while (nkids < n2go) {
787 kid[nkids] = sp_inactive;
788 int rv = open_process(&kid[nkids], NULL);
789 if (!rv) { // in child process?
790 while (nkids-- > 0) {
791 close(kid[nkids].r);
792 close(kid[nkids].w);
793 }
794 free(kid); free(kidRow);
795 kid = NULL; kidRow = NULL;
796 RunChild(); // should never return
797 _exit(1);
798 }
799 if (rv < 0) {
800 error(SYSTEM, "cannot fork worker process");
801 return false;
802 }
803 kidRow[nkids++] = -1; // newborn is ready
804 }
805 return true;
806 }
807
808 // Reap the indicated number of children (all if 0)
809 int
810 RcontribSimulManager::StopKids(int n2end)
811 {
812 if (nkids <= 0)
813 return 0;
814 FlushQueue();
815 int status = 0;
816 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
817 status = close_processes(kid, nkids);
818 free(kid); free(kidRow);
819 kid = NULL; kidRow = NULL;
820 nkids = 0;
821 } else { // else shrink family
822 int st;
823 while (n2end-- > 0)
824 if ((st = close_process(&kid[--nkids])) > 0)
825 status = st;
826 // could call realloc(), but it hardly matters
827 }
828 if (status) {
829 sprintf(errmsg, "non-zero (%d) status from child", status);
830 error(WARNING, errmsg);
831 }
832 return status;
833 }
834
835 // Set number of computation threads (0 => #cores)
836 int
837 RcontribSimulManager::SetThreadCount(int nt)
838 {
839 if (!Ready()) {
840 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
841 return 0;
842 }
843 if (nt < 0)
844 return NThreads();
845 if (!nt) nt = GetNCores();
846 int status = 0;
847 if (nt == 1)
848 status = StopKids();
849 else if (nt < nkids)
850 status = StopKids(nkids-nt);
851 else if (nt > nkids)
852 StartKids(nt-nkids);
853 if (status) {
854 sprintf(errmsg, "non-zero (%d) status from child", status);
855 error(WARNING, errmsg);
856 }
857 return NThreads();
858 }