ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/Development/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.17
Committed: Thu Oct 23 16:33:39 2025 UTC (13 days, 6 hours ago) by greg
Branch: MAIN
Changes since 2.16: +10 -3 lines
Log Message:
refactor: Added fileDataShare() definition for convenience

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.16 2025/10/23 01:26:48 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 // new/exclusive, overwrite if exists, or recover data
28 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
29 RDSread|RDSwrite};
30
31 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
32 #define LNROWSTR 6
33
34 // Modifier channel for recording contributions (no constructor/destructor)
35 struct RcontribMod {
36 RcontribOutput * opl; // pointer to first output channel
37 char * params; // parameters string
38 EPNODE * binv; // bin expression (NULL if 1 bin)
39 int nbins; // bin count this modifier
40 int coffset; // column offset in bytes
41 DCOLORV cbin[1]; // bin accumulator (extends struct)
42 /// Access specific (spectral) color bin
43 DCOLORV * operator[](int n) {
44 if ((n < 0) | (n >= nbins)) return NULL;
45 return cbin + n*NCSAMP;
46 }
47 };
48
49 // Struct used to assign record calculation to child
50 struct RowAssignment {
51 uint32 row; // row to do
52 uint32 ac; // accumulation count
53 };
54
55 static const char ROW_DONE[] = "ROW FINISHED\n";
56
57 // allocate rcontrib accumulator
58 static RcontribMod *
59 NewRcMod(const char *prms, const char *binexpr, int ncbins)
60 {
61 if (binexpr && !*binexpr) binexpr = NULL;
62 if (!prms) prms = "";
63 if ((ncbins > 1) & !binexpr) {
64 error(USER, "missing bin expression");
65 return NULL;
66 }
67 if (ncbins < 1) ncbins = 1;
68
69 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
70 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
71 strlen(prms)+1);
72
73 if (binexpr) { // get/check bin expression
74 mp->binv = eparse(const_cast<char *>(binexpr));
75 if (mp->binv->type == NUM) { // constant expression (0)?
76 if ((int)evalue(mp->binv) != 0) {
77 sprintf(errmsg, "illegal non-zero constant for bin (%s)",
78 binexpr);
79 error(USER, errmsg);
80 }
81 if (ncbins > 1) {
82 sprintf(errmsg, "bad bin count (%d should be 1)", ncbins);
83 error(USER, errmsg);
84 }
85 epfree(mp->binv, true);
86 mp->binv = NULL;
87 prms = "";
88 ncbins = 1;
89 }
90 }
91 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
92 mp->nbins = ncbins;
93 return mp;
94 }
95
96 // Free an RcontribMod (public for RcontribSimulManager constructor)
97 void
98 FreeRcMod(void *p)
99 {
100 if (!p) return;
101 EPNODE * bep = (*(RcontribMod *)p).binv;
102 if (bep) epfree(bep, true);
103 efree(p);
104 }
105
106 // Get format identifier
107 const char *
108 formstr(int f)
109 {
110 switch (f) {
111 case 'a': return("ascii");
112 case 'f': return("float");
113 case 'd': return("double");
114 case 'c': return(NCSAMP==3 ? COLRFMT : SPECFMT);
115 }
116 return("unknown");
117 }
118
119 // Standard file data share function
120 RdataShare *
121 fileDataShare(const char *name, RCOutputOp op, size_t siz)
122 {
123 return new RdataShareFile(name, RSDOflags[op], siz);
124 }
125
126 // Memory-mapped data share function
127 RdataShare *
128 mapDataShare(const char *name, RCOutputOp op, size_t siz)
129 {
130 return new RdataShareMap(name, RSDOflags[op], siz);
131 }
132
133 // Set output format ('f', 'd', or 'c')
134 bool
135 RcontribSimulManager::SetDataFormat(int ty)
136 {
137 if (outList) {
138 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
139 return false;
140 }
141 switch (ty) {
142 case 'f':
143 dsiz = sizeof(float)*NCSAMP;
144 break;
145 case 'd':
146 dsiz = sizeof(double)*NCSAMP;
147 break;
148 case 'c':
149 dsiz = LSCOLR;
150 break;
151 default:
152 sprintf(errmsg, "unsupported output format '%c'", ty);
153 error(INTERNAL, errmsg);
154 return false;
155 }
156 dtyp = ty;
157 return true;
158 }
159
160 // static call-back for rcontrib ray-tracing
161 int
162 RcontribSimulManager::RctCall(RAY *r, void *cd)
163 {
164 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
165 return 0;
166 // shadow ray not on source?
167 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
168 return 0;
169
170 const char * mname = objptr(r->ro->omod)->oname;
171 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
172 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
173 if (!mp)
174 return 0; // not in our modifier list
175
176 int bi = 0; // get bin index
177 if (mp->binv) {
178 worldfunc(RCCONTEXT, r);
179 set_eparams(mp->params);
180 double bval = evalue(mp->binv);
181 if (bval <= -.5)
182 return 0; // silently ignore negative bin index
183 bi = int(bval + .5);
184 }
185 DCOLORV * dvp = (*mp)[bi];
186 if (!dvp) {
187 sprintf(errmsg, "bad bin number for '%s' (%d ignored)", mname, bi);
188 error(WARNING, errmsg);
189 return 0;
190 }
191 SCOLOR contr;
192 raycontrib(contr, r, PRIMARY); // compute coefficient
193 if (rcp->HasFlag(RCcontrib))
194 smultscolor(contr, r->rcol); // -> value contribution
195
196 for (int i = 0; i < NCSAMP; i++)
197 *dvp++ += contr[i]; // accumulate color/spectrum
198 return 1;
199 }
200
201 // check for given format type in string, return last char position
202 static int
203 hasFormat(const char *tst, const char *match)
204 {
205 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
206 const char * s = tst;
207 while (*s) {
208 if (*s++ != '%')
209 continue;
210 while (!strchr(allPoss, *s))
211 s++;
212 if (strchr(match, *s))
213 break;
214 s++;
215 }
216 return (*s != '\0')*(s - tst);
217 }
218
219 // Add a modifier and arguments, opening output file(s)
220 bool
221 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
222 const char *prms, const char *binval, int bincnt)
223 {
224 if (!modn | !outspec || !*modn | !*outspec) {
225 error(WARNING, "ignoring bad call to AddModifier()");
226 return false;
227 }
228 if (*outspec == '!') {
229 error(USER, "command output not supported by RcontribSimulManager");
230 return false;
231 }
232 if (!nChan) { // initial call?
233 if ((xres < 0) | (yres <= 0)) {
234 error(USER, "xres, yres must be set before first modifier");
235 return false;
236 }
237 if (!SetDataFormat(dtyp))
238 return false;
239 nChan = NCSAMP;
240 } else if (nChan != NCSAMP) {
241 error(USER, "# spectral channels must be fixed in AddModifier()");
242 return false;
243 }
244 if (Ready()) {
245 error(INTERNAL, "call to AddModifier() after PrepOutput()");
246 return false;
247 }
248 LUENT * lp = lu_find(&modLUT, modn);
249 if (lp->data) {
250 sprintf(errmsg, "duplicate modifier '%s'", modn);
251 error(USER, errmsg);
252 return false;
253 }
254 if (!lp->key) // create new entry
255 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
256
257 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
258 if (!mp) return false;
259 lp->data = (char *)mp;
260 const int modndx = hasFormat(outspec, "sb");
261 const int binndx = hasFormat(outspec, "diouxX");
262 int bin0 = 0;
263 char fnbuf[512];
264 if (!modndx || (binndx > 0) & (modndx > binndx))
265 sprintf(fnbuf, outspec, bin0, modn);
266 else
267 sprintf(fnbuf, outspec, modn, bin0);
268 RcontribOutput * olast = NULL;
269 RcontribOutput * op;
270 for (op = outList; op; op = (olast=op)->next) {
271 if (strcmp(op->GetName(), fnbuf))
272 continue;
273 if (modndx) { // this ain't right
274 sprintf(errmsg, "output name collision for '%s'", fnbuf);
275 error(USER, errmsg);
276 return false;
277 }
278 if ((binndx > 0) & (xres > 0)) {
279 sprintf(fnbuf, outspec, ++bin0);
280 continue; // each bin goes to another image
281 }
282 mp->coffset = op->rowBytes; // else add to what's there
283 break;
284 }
285 if (!op) { // create new output channel?
286 op = new RcontribOutput(fnbuf);
287 if (olast) olast->next = op;
288 else outList = op;
289 }
290 mp->opl = op; // first (maybe only) output channel
291 if (modndx) // remember modifier if part of name
292 op->omod = lp->key;
293 if (binndx) { // append output image/bin list
294 op->rowBytes += dsiz;
295 op->obin = bin0;
296 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
297 if (!modndx || (binndx > 0) & (modndx > binndx))
298 sprintf(fnbuf, outspec, bin0+bincnt, modn);
299 else
300 sprintf(fnbuf, outspec, modn, bin0+bincnt);
301 if (!op->next) {
302 olast = op;
303 olast->next = op = new RcontribOutput(fnbuf);
304 if (modndx) op->omod = lp->key;
305 op->obin = bin0+bincnt;
306 } else {
307 op = op->next;
308 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
309 "bin number mismatch in AddModifier()");
310 }
311 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
312 "row offset mismatch in AddModifier()");
313 op->rowBytes += dsiz;
314 }
315 } else // else send all results to this channel
316 op->rowBytes += mp->nbins*dsiz;
317 return true;
318 }
319
320 // Add a file of modifiers with associated arguments
321 bool
322 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
323 const char *prms,
324 const char *binval, int bincnt)
325 {
326 char * path = getpath(const_cast<char *>(modfn),
327 getrlibpath(), R_OK);
328 FILE * fp;
329 if (!path || !(fp = fopen(path, "r"))) {
330 sprintf(errmsg, "cannot %s modifier file '%s'",
331 path ? "open" : "find", modfn);
332 error(SYSTEM, errmsg);
333 return false;
334 }
335 char mod[MAXSTR];
336 while (fgetword(mod, sizeof(mod), fp))
337 if (!AddModifier(mod, outspec, prms, binval, bincnt)) {
338 fclose(fp);
339 return false;
340 }
341 fclose(fp);
342 return true;
343 }
344
345 // call-back to check if modifier has been loaded
346 static int
347 checkModExists(const LUENT *lp, void *p)
348 {
349 OBJECT mod = modifier(lp->key);
350
351 if ((mod != OVOID) & (mod < nsceneobjs))
352 return 1;
353
354 sprintf(errmsg, "tracked modifier '%s' not found in main scene", lp->key);
355 error(WARNING, errmsg);
356 return 0;
357 }
358
359 // Prepare output channels and return # completed rows
360 int
361 RcontribSimulManager::PrepOutput()
362 {
363 if (!outList || !RtraceSimulManager::Ready()) {
364 error(INTERNAL, "PrepOutput() called before octree & modifiers set");
365 return -1;
366 }
367 if (!cdsF) {
368 error(INTERNAL, "missing RdataShare constructor call (cdsF)");
369 return -1;
370 }
371 if (lu_doall(&modLUT, checkModExists, NULL) < 0)
372 return -1;
373
374 outList->nRows = yres * (xres + !xres); // all outputs have same #rows
375 int remWarnings = 20;
376 for (RcontribOutput *op = outList; op; op = op->next) {
377 if (op->rData) {
378 error(INTERNAL, "output channel already open in PrepOutput()");
379 return -1;
380 }
381 op->nRows = outList->nRows;
382 op->rData = (*cdsF)(op->ofname, outOp,
383 GetHeadLen()+1024 + op->nRows*op->rowBytes);
384 freeqstr(op->ofname); op->ofname = NULL;
385 if (outOp == RCOrecover) {
386 int rd = op->CheckHeader(this);
387 if (rd < 0)
388 return -1;
389 if (rd >= op->nRows) {
390 if (remWarnings >= 0) {
391 sprintf(errmsg, "recovered output '%s' is complete",
392 op->GetName());
393 error(WARNING, --remWarnings ? errmsg : "etc...");
394 }
395 rd = op->nRows;
396 }
397 if (!rInPos | (rInPos > rd)) rInPos = rd;
398 } else if (!op->NewHeader(this))
399 return -1;
400 // make sure there's room
401 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
402 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
403 return -1; // calls error() for us
404 }
405 rowsDone.NewBitMap(outList->nRows); // create row completion map
406 rowsDone.ClearBits(0, rInPos, true);
407 return rInPos;
408 }
409
410 // Create header in open write-only channel
411 bool
412 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
413 {
414 int headlim = rcp->GetHeadLen() + 1024;
415 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
416 int esiz;
417 const int etyp = rcp->GetFormat(&esiz);
418
419 strcpy(hdr, HDRSTR);
420 strcat(hdr, "RADIANCE\n");
421 begData = strlen(hdr);
422 strcpy(hdr+begData, rcp->GetHeadStr());
423 begData += rcp->GetHeadLen();
424 if (omod) {
425 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
426 begData += strlen(hdr+begData);
427 }
428 if (obin >= 0) {
429 sprintf(hdr+begData, "BIN=%d\n", obin);
430 begData += strlen(hdr+begData);
431 }
432 strcpy(hdr+begData, ROWZEROSTR);
433 rowCountPos = begData+LNROWSTR;
434 begData += sizeof(ROWZEROSTR)-1;
435 if (!rcp->xres | (rowBytes > esiz)) {
436 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
437 begData += strlen(hdr+begData);
438 }
439 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
440 begData += strlen(hdr+begData);
441 if (NCSAMP > 3) {
442 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
443 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
444 begData += strlen(hdr+begData);
445 }
446 if (etyp != 'c') {
447 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
448 begData += strlen(hdr+begData);
449 }
450 int align = 0;
451 switch (etyp) {
452 case 'f':
453 align = sizeof(float);
454 break;
455 case 'd':
456 align = sizeof(double);
457 break;
458 case 'c':
459 break;
460 default:
461 error(INTERNAL, "unsupported data type in NewHeader()");
462 return false;
463 }
464 strcpy(hdr+begData, FMTSTR); // write format string
465 begData += strlen(hdr+begData);
466 strcpy(hdr+begData, formstr(etyp));
467 begData += strlen(hdr+begData);
468 if (align) // align data at end of header
469 while ((begData+2) % align)
470 hdr[begData++] = ' ';
471 hdr[begData++] = '\n'; // EOL for data format
472 hdr[begData++] = '\n'; // end of nominal header
473 if ((rcp->xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
474 sprintf(hdr+begData, PIXSTDFMT, rcp->yres, rcp->xres);
475 begData += strlen(hdr+begData);
476 }
477 return rData->ReleaseMemory(hdr, RDSwrite);
478 }
479
480 // find string argument in header for named variable
481 static const char *
482 findArgs(const char *hdr, const char *vnm, int len)
483 {
484 const char * npos = strnstr(hdr, vnm, len);
485
486 if (!npos) return NULL;
487
488 npos += strlen(vnm); // find start of (first) argument
489 len -= npos - hdr;
490 while (len-- > 0 && (*npos == '=') | isspace(*npos))
491 if (*npos++ == '\n')
492 return NULL;
493
494 return (len >= 0) ? npos : NULL;
495 }
496
497 // Load and check header in read/write channel
498 int
499 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
500 {
501 int esiz;
502 const int etyp = rcp->GetFormat(&esiz);
503 const int maxlen = rcp->GetHeadLen() + 1024;
504 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
505 const char * cp;
506 // find end of header
507 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
508 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
509 error(USER, errmsg);
510 return -1;
511 }
512 begData = cp - hdr + 1; // increment again at end
513 // check # components
514 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
515 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
516 error(USER, errmsg);
517 return -1;
518 }
519 // check format
520 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
521 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
522 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
523 error(USER, errmsg);
524 return -1;
525 }
526 // check #columns
527 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
528 !rcp->xres | (rowBytes > esiz)) {
529 sprintf(errmsg, "expected NCOLS=%d in '%s'",
530 int(rowBytes/esiz), GetName());
531 error(USER, errmsg);
532 return -1;
533 }
534 // find row count
535 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
536 sprintf(errmsg, "missing NROWS in '%s'", GetName());
537 error(USER, errmsg);
538 return -1;
539 }
540 rowCountPos = cp - hdr;
541 int rlast = atoi(cp);
542 begData++; // advance past closing EOL
543 if ((rcp->xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
544 char rbuf[64];
545 sprintf(rbuf, PIXSTDFMT, rcp->yres, rcp->xres);
546 int rlen = strlen(rbuf);
547 if (strncmp(rbuf, hdr+begData, rlen)) {
548 sprintf(errmsg, "bad resolution string in '%s'", GetName());
549 error(USER, errmsg);
550 return -1;
551 }
552 begData += rlen;
553 }
554 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
555 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
556 }
557
558 // Rewind calculation (previous results unchanged)
559 bool
560 RcontribSimulManager::ResetRow(int r)
561 {
562 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
563 error(WARNING, "ignoring bad call to ResetRow()");
564 return (r == rInPos);
565 }
566 FlushQueue(); // finish current and reset
567 for (RcontribOutput *op = outList; op; op = op->next)
568 if (!op->SetRowsDone(r))
569 return false;
570
571 rowsDone.ClearBits(r, rInPos-r, false);
572 rInPos = r;
573 return true;
574 }
575
576 // call-back for averaging each modifier's contributions to assigned channel(s)
577 static int
578 putModContrib(const LUENT *lp, void *p)
579 {
580 RcontribMod * mp = (RcontribMod *)lp->data;
581 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
582 const double sca = 1./rcp->accum;
583 const DCOLORV * dvp = mp->cbin;
584 RcontribOutput * op = mp->opl;
585 int i, n;
586
587 switch (rcp->GetFormat()) { // conversion based on output type
588 case 'd': {
589 double * dvo = (double *)op->InsertionP(mp->coffset);
590 for (n = mp->nbins; n--; ) {
591 for (i = 0; i < NCSAMP; i++)
592 *dvo++ = *dvp++ * sca;
593 if ((op->obin >= 0) & (n > 0))
594 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
595 }
596 } break;
597 case 'f': {
598 float * fvo = (float *)op->InsertionP(mp->coffset);
599 for (n = mp->nbins; n--; ) {
600 for (i = 0; i < NCSAMP; i++)
601 *fvo++ = float(*dvp++ * sca);
602 if ((op->obin >= 0) & (n > 0))
603 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
604 }
605 } break;
606 case 'c': {
607 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
608 for (n = mp->nbins; n--; ) {
609 SCOLOR scol;
610 for (i = 0; i < NCSAMP; i++)
611 scol[i] = COLORV(*dvp++ * sca);
612 scolor_scolr(cvo, scol);
613 cvo += LSCOLR;
614 if ((op->obin >= 0) & (n > 0))
615 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
616 }
617 } break;
618 default:
619 error(CONSISTENCY, "unsupported output type in putModContrib()");
620 return -1;
621 }
622 // clear for next tally
623 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
624 return 1;
625 }
626
627 // Add a ray/bundle to compute next record
628 int
629 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
630 {
631 if (!Ready())
632 return 0;
633 if (rInPos >= outList->nRows) {
634 error(WARNING, "ComputeRecord() called after last record");
635 return 0;
636 }
637 if (nkids > 0) { // in parent process?
638 int k = GetChild(false); // updates output rows
639 if (k < 0) return -1; // someone died?
640 RowAssignment rass;
641 rass.row = kidRow[k] = rInPos++;
642 rass.ac = accum;
643 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
644 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
645 != sizeof(FVECT)*2*accum) {
646 error(SYSTEM, "cannot write to child; dead process?");
647 return -1;
648 }
649 return accum; // tracing/output happens in child
650 }
651 if (NCSAMP != nChan) {
652 error(INTERNAL, "number of color channels must remain fixed");
653 return -1;
654 }
655 // actual work is done here...
656 if (EnqueueBundle(orig_direc, accum) < 0)
657 return -1;
658
659 RcontribOutput * op; // prepare output buffers
660 for (op = outList; op; op = op->next)
661 if (!op->GetRow(rInPos))
662 return -1;
663 // convert averages & clear
664 if (lu_doall(&modLUT, putModContrib, this) < 0)
665 return -1;
666 // write buffers
667 for (op = outList; op; op = op->next)
668 op->DoneRow();
669
670 if (!nkids) // update row if solo process
671 UpdateRowsDone(rInPos++); // XXX increment here is critical
672
673 return accum;
674 }
675
676 // Get next available child, returning index or -1 if forceWait & idling
677 int
678 RcontribSimulManager::GetChild(bool forceWait)
679 {
680 if (nkids <= 0)
681 return -1;
682 // take inventory
683 int pn, n = 0;
684 fd_set readset, errset;
685 FD_ZERO(&readset); FD_ZERO(&errset);
686 for (pn = nkids; pn--; ) {
687 if (kidRow[pn] < 0) { // child already ready?
688 if (forceWait) continue;
689 return pn; // good enough
690 }
691 FD_SET(kid[pn].r, &readset); // will check on this one
692 FD_SET(kid[pn].r, &errset);
693 if (kid[pn].r >= n)
694 n = kid[pn].r + 1;
695 }
696 if (!n) // every child is idle?
697 return -1;
698 // wait on "busy" child(ren)
699 while ((n = select(n, &readset, NULL, &errset, NULL)) <= 0)
700 if (errno != EINTR) {
701 error(SYSTEM, "select call failed in GetChild()");
702 return -1;
703 }
704 char buf[sizeof(ROW_DONE)] = "X";
705 pn = -1; // get flags set by select
706 for (n = nkids; n--; )
707 if (kidRow[n] >= 0 &&
708 FD_ISSET(kid[n].r, &readset) |
709 FD_ISSET(kid[n].r, &errset)) {
710 // check for error
711 if (FD_ISSET(kid[n].r, &errset) ||
712 read(kid[n].r, buf, sizeof(ROW_DONE)) <= 0 ||
713 memcmp(buf, ROW_DONE, sizeof(ROW_DONE)))
714 return -1;
715 // update output row counts
716 UpdateRowsDone(kidRow[n]);
717 kidRow[n] = -1; // flag child available
718 pn = n;
719 }
720 return pn;
721 }
722
723 // Update row completion bitmap and update outputs (does not change rInPos)
724 bool
725 RcontribSimulManager::UpdateRowsDone(int r)
726 {
727 if (!rowsDone.TestAndSet(r)) {
728 error(WARNING, "redundant call to UpdateRowsDone()");
729 return false;
730 }
731 int nDone = GetRowFinished();
732 if (nDone <= r)
733 return true; // nothing to update, yet
734 for (RcontribOutput *op = outList; op; op = op->next)
735 if (!op->SetRowsDone(nDone))
736 return false;
737 return true; // up-to-date
738 }
739
740 // Run rcontrib child process (never returns)
741 void
742 RcontribSimulManager::RunChild()
743 {
744 FVECT * vecList = NULL;
745 RowAssignment rass;
746 ssize_t nr;
747
748 accum = 0;
749 errno = 0;
750 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
751 if (!rass.ac) {
752 error(CONSISTENCY, "bad accumulator count in child");
753 exit(1);
754 }
755 if (rass.ac > accum) {
756 efree(vecList);
757 vecList = (FVECT *)emalloc(sizeof(FVECT)*2*rass.ac);
758 }
759 accum = rass.ac;
760 rInPos = rass.row;
761
762 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
763 sizeof(FVECT)*2*accum)
764 break;
765
766 if (ComputeRecord(vecList) <= 0)
767 exit(1);
768 // signal this row is done
769 if (write(1, ROW_DONE, sizeof(ROW_DONE)) != sizeof(ROW_DONE))
770 exit(1);
771 }
772 if (nr) {
773 error(SYSTEM, "read error in child process");
774 exit(1);
775 }
776 exit(0);
777 }
778
779 // Add child processes as indicated
780 bool
781 RcontribSimulManager::StartKids(int n2go)
782 {
783 if ((n2go <= 0) | (nkids + n2go <= 1))
784 return false;
785
786 if (!nkids)
787 cow_memshare(); // preload objects
788
789 n2go += nkids; // => desired brood size
790 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
791 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
792
793 fflush(stdout); // shouldn't use, anyway
794 while (nkids < n2go) {
795 kid[nkids] = sp_inactive;
796 int rv = open_process(&kid[nkids], NULL);
797 if (!rv) { // in child process?
798 while (nkids-- > 0) {
799 close(kid[nkids].r);
800 close(kid[nkids].w);
801 }
802 free(kid); free(kidRow);
803 kid = NULL; kidRow = NULL;
804 RunChild(); // should never return
805 _exit(1);
806 }
807 if (rv < 0) {
808 error(SYSTEM, "cannot fork worker process");
809 return false;
810 }
811 kidRow[nkids++] = -1; // newborn is ready
812 }
813 return true;
814 }
815
816 // Reap the indicated number of children (all if 0)
817 int
818 RcontribSimulManager::StopKids(int n2end)
819 {
820 if (nkids <= 0)
821 return 0;
822 FlushQueue();
823 int status = 0;
824 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
825 status = close_processes(kid, nkids);
826 free(kid); free(kidRow);
827 kid = NULL; kidRow = NULL;
828 nkids = 0;
829 } else { // else shrink family
830 int st;
831 while (n2end-- > 0)
832 if ((st = close_process(&kid[--nkids])) > 0)
833 status = st;
834 // could call realloc(), but it hardly matters
835 }
836 if (status) {
837 sprintf(errmsg, "non-zero (%d) status from child", status);
838 error(WARNING, errmsg);
839 }
840 return status;
841 }
842
843 // Set number of computation threads (0 => #cores)
844 int
845 RcontribSimulManager::SetThreadCount(int nt)
846 {
847 if (!Ready()) {
848 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
849 return 0;
850 }
851 if (nt < 0)
852 return NThreads();
853 if (!nt) nt = GetNCores();
854 int status = 0;
855 if (nt == 1)
856 status = StopKids();
857 else if (nt < nkids)
858 status = StopKids(nkids-nt);
859 else if (nt > nkids)
860 StartKids(nt-nkids);
861 if (status) {
862 sprintf(errmsg, "non-zero (%d) status from child", status);
863 error(WARNING, errmsg);
864 }
865 return NThreads();
866 }