ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.1
Committed: Tue Oct 29 00:36:54 2024 UTC (6 months ago) by greg
Branch: MAIN
Log Message:
feat(rxcontrib): First compiled version of rxcontrib tool to test new C++ classes

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id$";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 extern int contrib; /* computing contributions? */
28 extern int lim_dist; /* limit distance? */
29
30 // new/exclusive, overwrite if exists, or recover data
31 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
32 RDSread|RDSwrite};
33
34 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
35 #define LNROWSTR 6
36
37 // Modifier channel for recording contributions (no constructor/destructor)
38 struct RcontribMod {
39 RcontribOutput * opl; // pointer to first output channel
40 char * params; // parameters string
41 EPNODE * binv; // bin expression
42 int nbins; // bin count this modifier
43 int coffset; // column offset in bytes
44 DCOLORV cbin[1]; // bin accumulator (extends struct)
45 /// Access specific (spectral) color bin
46 DCOLORV * operator[](int n) {
47 if ((n < 0) | (n >= nbins)) return NULL;
48 return cbin + n*NCSAMP;
49 }
50 };
51
52 // used to assign record calc to child
53 struct RowAssignment {
54 uint32 row; // row to do
55 uint32 ac; // accumulation count
56 };
57
58 // Our default data share function
59 RdataShare *
60 defDataShare(const char *name, RCOutputOp op, size_t siz)
61 {
62 return new RdataShareMap(name, RSDOflags[op], siz);
63 }
64
65 // Allocate rcontrib accumulator
66 RcontribMod *
67 NewRcMod(const char *prms, const char *binexpr, int ncbins)
68 {
69 if (ncbins <= 0) return NULL;
70 if (!prms) prms = "";
71 if (!binexpr | (ncbins == 1))
72 binexpr = "0";
73
74 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
75 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
76 strlen(prms)+1);
77
78 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
79 mp->binv = eparse(const_cast<char *>(binexpr));
80 mp->nbins = ncbins;
81 return mp;
82 }
83
84 // Free an RcontribMod
85 void
86 FreeRcMod(void *p)
87 {
88 if (!p) return;
89 epfree((*(RcontribMod *)p).binv, true);
90 efree(p);
91 }
92
93 // Set output format ('f', 'd', or 'c')
94 bool
95 RcontribSimulManager::SetDataFormat(int ty)
96 {
97 if (outList) {
98 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
99 return false;
100 }
101 switch (ty) {
102 case 'f':
103 dsiz = sizeof(float)*NCSAMP;
104 break;
105 case 'd':
106 dsiz = sizeof(double)*NCSAMP;
107 break;
108 case 'c':
109 dsiz = LSCOLR;
110 break;
111 default:
112 sprintf(errmsg, "unsupported output format '%c'", ty);
113 error(INTERNAL, errmsg);
114 return false;
115 }
116 dtyp = ty;
117 return true;
118 }
119
120 // static call-back for rcontrib ray-tracing
121 int
122 RcontribSimulManager::RctCall(RAY *r, void *cd)
123 {
124 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
125 return 0;
126 // shadow ray not on source?
127 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
128 return 0;
129
130 const char * mname = objptr(r->ro->omod)->oname;
131 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
132 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
133 if (!mp)
134 return 0; // not in our modifier list
135
136 worldfunc(RCCONTEXT, r); // compute bin #
137 set_eparams(mp->params);
138 double bval = evalue(mp->binv);
139 if (bval <= -.5)
140 return 0; // silently ignore negative bin index
141 DCOLORV * dvp = (*mp)[int(bval + .5)];
142 if (!dvp) {
143 sprintf(errmsg, "bad bin number for '%s' (%.1f ignored)", mname, bval);
144 error(WARNING, errmsg);
145 return 0;
146 }
147 SCOLOR contr;
148 raycontrib(contr, r, PRIMARY); // compute coefficient
149 if (contrib)
150 smultscolor(contr, r->rcol); // -> value contribution
151 for (int i = 0; i < NCSAMP; i++)
152 *dvp++ += contr[i]; // accumulate color/spectrum
153 return 1;
154 }
155
156 // check for given format type in string, return last char position
157 static int
158 hasFormat(const char *tst, const char *match)
159 {
160 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
161 const char * s = tst;
162 while (*s) {
163 if (*s++ != '%')
164 continue;
165 while (!strchr(allPoss, *s))
166 s++;
167 if (strchr(match, *s))
168 break;
169 s++;
170 }
171 return (*s != '\0')*(s - tst);
172 }
173
174 // Add a modifier and arguments, opening output file(s)
175 bool
176 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
177 const char *prms, const char *binval, int bincnt)
178 {
179 if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
180 error(WARNING, "ignoring bad call to AddModifier()");
181 return false;
182 }
183 if (!nChan) { // initial call?
184 if (!SetDataFormat(dtyp))
185 return false;
186 nChan = NCSAMP;
187 } else if (nChan != NCSAMP) {
188 error(INTERNAL, "number of spectral channels must be fixed");
189 return false;
190 }
191 if (Ready()) {
192 error(INTERNAL, "call to AddModifier() after PrepOutput()");
193 return false;
194 }
195 LUENT * lp = lu_find(&modLUT, modn);
196 if (lp->data) {
197 sprintf(errmsg, "duplicate modifier '%s'", modn);
198 error(USER, errmsg);
199 return false;
200 }
201 if (!lp->key) // create new entry
202 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
203
204 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
205 if (!mp) return false;
206 lp->data = (char *)mp;
207 const int modndx = hasFormat(outspec, "sb");
208 const int binndx = hasFormat(outspec, "diouxX");
209 int bin0 = 0;
210 char fnbuf[512];
211 if (!modndx | (modndx > binndx))
212 sprintf(fnbuf, outspec, bin0, modn);
213 else
214 sprintf(fnbuf, outspec, modn, bin0);
215 RcontribOutput * olast = NULL;
216 RcontribOutput * op;
217 for (op = outList; op; op = (olast=op)->next) {
218 if (strcmp(op->GetName(), fnbuf))
219 continue;
220 if (modndx) { // this ain't right
221 sprintf(errmsg, "output name collision for '%s'", fnbuf);
222 error(USER, errmsg);
223 return false;
224 }
225 if ((binndx > 0) & (xres > 0)) {
226 sprintf(fnbuf, outspec, ++bin0);
227 continue; // each bin goes to another image
228 }
229 mp->coffset = op->rowBytes; // else add to what's there
230 break;
231 }
232 if (!op) { // create new output channel?
233 op = new RcontribOutput(fnbuf);
234 if (olast) olast->next = op;
235 else outList = op;
236 }
237 mp->opl = op; // first (maybe only) output channel
238 if (modndx) // remember modifier if part of name
239 op->omod = lp->key;
240 if (binndx > 0) { // append output image/bin list
241 op->rowBytes += dsiz;
242 op->obin = bin0;
243 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
244 if (!modndx | (modndx > binndx))
245 sprintf(fnbuf, outspec, bin0+bincnt, modn);
246 else
247 sprintf(fnbuf, outspec, modn, bin0+bincnt);
248 if (!op->next) {
249 olast = op;
250 olast->next = op = new RcontribOutput(fnbuf);
251 if (modndx) op->omod = lp->key;
252 op->obin = bin0+bincnt;
253 } else {
254 op = op->next;
255 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
256 "bin number mismatch in AddModifier()");
257 }
258 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
259 "row offset mismatch in AddModifier()");
260 op->rowBytes += dsiz;
261 }
262 } else // else send all results to this channel
263 op->rowBytes += bincnt*dsiz;
264 return true;
265 }
266
267 // Add a file of modifiers with associated arguments
268 bool
269 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
270 const char *prms,
271 const char *binval, int bincnt)
272 {
273 char * path = getpath(const_cast<char *>(modfn),
274 getrlibpath(), R_OK);
275 FILE * fp;
276 if (!path || !(fp = fopen(path, "r"))) {
277 sprintf(errmsg, "cannot %s modifier file '%s'",
278 path ? "open" : "find", modfn);
279 error(SYSTEM, errmsg);
280 return false;
281 }
282 char mod[MAXSTR];
283 while (fgetword(mod, sizeof(mod), fp))
284 if (!AddModifier(mod, outspec, prms, binval, bincnt))
285 return false;
286 fclose(fp);
287 return true;
288 }
289
290 // Run through current list of output struct's
291 int
292 RcontribSimulManager::GetOutputs(RoutputShareF *osF, void *cd) const
293 {
294 int cnt = 0;
295
296 for (const RcontribOutput *op = outList; op; op = op->next) {
297 int rv = 1;
298 if (osF && (rv = (*osF)(op, cd)) < 0)
299 return rv;
300 cnt += rv;
301 }
302 return cnt;
303 }
304
305 // Prepare output channels and return # completed rows
306 int
307 RcontribSimulManager::PrepOutput()
308 {
309 if (!outList || !RtraceSimulManager::Ready()) {
310 error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
311 return -1;
312 }
313 int remWarnings = 20;
314 for (RcontribOutput *op = outList; op; op = op->next) {
315 if (op->rData) {
316 error(INTERNAL, "output channel already open in PrepOutput()");
317 return -1;
318 }
319 op->nRows = yres * (xres + !xres);
320 op->rData = (*cdsF)(op->ofname, outOp,
321 GetHeadLen()+1024 + op->nRows*op->rowBytes);
322 freeqstr(op->ofname); op->ofname = NULL;
323 if (outOp == RCOrecover) {
324 int rd = op->CheckHeader(this);
325 if (rd < 0)
326 return -1;
327 if (rd >= op->nRows) {
328 if (remWarnings >= 0) {
329 sprintf(errmsg, "recovered output '%s' already done",
330 op->GetName());
331 error(WARNING, remWarnings ? errmsg : "etc...");
332 remWarnings--;
333 }
334 rd = op->nRows;
335 }
336 if (!rInPos | (rInPos > rd)) rInPos = rd;
337 } else if (!op->NewHeader(this))
338 return -1;
339 // make sure there's room
340 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
341 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
342 return -1; // calls error() for us
343 }
344 if (lim_dist) // XXX where else to put this?
345 rtFlags |= RTlimDist;
346 else
347 rtFlags &= ~RTlimDist;
348
349 rowsDone.NewBitMap(outList->nRows); // create row completion map
350 rowsDone.ClearBits(0, rInPos, true);
351 return rInPos;
352 }
353
354 // Create header in open write-only channel
355 bool
356 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
357 {
358 int headlim = rcp->GetHeadLen() + 1024;
359 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
360 int esiz;
361 const int etyp = rcp->GetFormat(&esiz);
362
363 strcpy(hdr, HDRSTR);
364 strcat(hdr, "RADIANCE\n");
365 begData = strlen(hdr);
366 strcpy(hdr+begData, rcp->GetHeadStr());
367 begData += rcp->GetHeadLen();
368 if (omod) {
369 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
370 begData += strlen(hdr+begData);
371 }
372 if (obin >= 0) {
373 sprintf(hdr+begData, "BIN=%d\n", obin);
374 begData += strlen(hdr+begData);
375 }
376 strcpy(hdr+begData, ROWZEROSTR);
377 rowCountPos = begData+LNROWSTR;
378 begData += sizeof(ROWZEROSTR)-1;
379 if (!xres | (rowBytes > esiz)) {
380 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
381 begData += strlen(hdr+begData);
382 }
383 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
384 begData += strlen(hdr+begData);
385 if (NCSAMP > 3) {
386 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
387 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
388 begData += strlen(hdr+begData);
389 }
390 if (etyp != 'c') {
391 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
392 begData += strlen(hdr+begData);
393 }
394 int align = 0;
395 switch (etyp) {
396 case 'f':
397 align = sizeof(float);
398 break;
399 case 'd':
400 align = sizeof(double);
401 break;
402 case 'c':
403 break;
404 default:
405 error(INTERNAL, "unsupported data type in NewHeader()");
406 return false;
407 }
408 strcpy(hdr+begData, FMTSTR); // write format string
409 begData += strlen(hdr+begData);
410 strcpy(hdr+begData, formstr(etyp));
411 begData += strlen(hdr+begData);
412 if (align) // align data at end of header
413 while ((begData+2) % align)
414 hdr[begData++] = ' ';
415 hdr[begData++] = '\n'; // EOL for data format
416 hdr[begData++] = '\n'; // end of nominal header
417 if ((xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
418 sprintf(hdr+begData, PIXSTDFMT, yres, xres);
419 begData += strlen(hdr+begData);
420 }
421 return rData->ReleaseMemory(hdr, RDSwrite);
422 }
423
424 // find string argument in header for named variable
425 static const char *
426 findArgs(const char *hdr, const char *vnm, int len)
427 {
428 const char * npos = strnstr(hdr, vnm, len);
429
430 if (!npos) return NULL;
431
432 npos += strlen(vnm); // find start of (first) argument
433 len -= npos - hdr;
434 while (len-- > 0 && (*npos == '=') | isspace(*npos))
435 if (*npos++ == '\n')
436 return NULL;
437
438 return (len >= 0) ? npos : NULL;
439 }
440
441 // Load and check header in read/write channel
442 int
443 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
444 {
445 int esiz;
446 const int etyp = rcp->GetFormat(&esiz);
447 const int maxlen = rcp->GetHeadLen() + 1024;
448 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
449 const char * cp;
450 // find end of header
451 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
452 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
453 error(USER, errmsg);
454 return -1;
455 }
456 begData = cp - hdr + 1; // increment again at end
457 // check # components
458 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
459 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
460 error(USER, errmsg);
461 return -1;
462 }
463 // check format
464 if (!(cp = findArgs(hdr, FMTSTR, begData)) || strcmp(cp, formstr(etyp))) {
465 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
466 error(USER, errmsg);
467 return -1;
468 }
469 // check #columns
470 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
471 !xres | (rowBytes > esiz)) {
472 sprintf(errmsg, "expected NCOLS=%d in '%s'",
473 int(rowBytes/esiz), GetName());
474 error(USER, errmsg);
475 return -1;
476 }
477 // find row count
478 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
479 sprintf(errmsg, "missing NROWS in '%s'", GetName());
480 error(USER, errmsg);
481 return -1;
482 }
483 rowCountPos = cp - hdr;
484 int rlast = atoi(cp);
485 begData++; // advance past closing EOL
486 if ((xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
487 char rbuf[64];
488 sprintf(rbuf, PIXSTDFMT, yres, xres);
489 int rlen = strlen(rbuf);
490 if (strncmp(rbuf, hdr+begData, rlen)) {
491 sprintf(errmsg, "bad resolution string in '%s'", GetName());
492 error(USER, errmsg);
493 return -1;
494 }
495 begData += rlen;
496 }
497 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
498 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
499 }
500
501 // Rewind calculation (previous results unchanged)
502 bool
503 RcontribSimulManager::ResetRow(int r)
504 {
505 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
506 error(WARNING, "ignoring bad call to ResetRow()");
507 return (r == rInPos);
508 }
509 FlushQueue(); // finish current and reset
510 for (RcontribOutput *op = outList; op; op = op->next)
511 if (!op->SetRowsDone(r))
512 return false;
513
514 rowsDone.ClearBits(r, rInPos-r, false);
515 rInPos = r;
516 return true;
517 }
518
519 // call-back for averaging each modifier's contributions to assigned channel(s)
520 static int
521 putModContrib(const LUENT *lp, void *p)
522 {
523 RcontribMod * mp = (RcontribMod *)lp->data;
524 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
525 const double sca = 1./rcp->accum;
526 const DCOLORV * dvp = mp->cbin;
527 RcontribOutput * op = mp->opl;
528 int i, n;
529
530 switch (rcp->GetFormat()) { // conversion based on output type
531 case 'd': {
532 double * dvo = (double *)op->InsertionP(mp->coffset);
533 for (n = mp->nbins; n--; ) {
534 for (i = 0; i < NCSAMP; i++)
535 *dvo++ = *dvp++ * sca;
536 if ((op->obin >= 0) & (n > 0))
537 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
538 }
539 } break;
540 case 'f': {
541 float * fvo = (float *)op->InsertionP(mp->coffset);
542 for (n = mp->nbins; n--; ) {
543 for (i = 0; i < NCSAMP; i++)
544 *fvo++ = float(*dvp++ * sca);
545 if ((op->obin >= 0) & (n > 0))
546 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
547 }
548 } break;
549 case 'c': {
550 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
551 for (n = mp->nbins; n--; ) {
552 SCOLOR scol;
553 for (i = 0; i < NCSAMP; i++)
554 scol[i] = COLORV(*dvp++ * sca);
555 scolor_scolr(cvo, scol);
556 cvo += LSCOLR;
557 if ((op->obin >= 0) & (n > 0))
558 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
559 }
560 } break;
561 default:
562 error(CONSISTENCY, "unsupported output type in sendModContrib()");
563 return -1;
564 }
565 // clear for next tally
566 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
567 return 1;
568 }
569
570 // Add a ray/bundle to compute next record
571 int
572 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
573 {
574 if (!Ready())
575 return 0;
576 if (rInPos >= outList->nRows) {
577 error(WARNING, "ComputeRecord() called after last record");
578 return 0;
579 }
580 if (nkids > 0) { // in parent process?
581 int k = GetChild(); // updates output rows
582 if (k < 0) return -1; // can't really happen
583 RowAssignment rass;
584 rass.row = kidRow[k] = rInPos++;
585 rass.ac = accum;
586 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
587 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
588 != sizeof(FVECT)*2*accum) {
589 error(SYSTEM, "cannot write to child; dead process?");
590 return -1;
591 }
592 return accum; // tracing/output happens in child
593 }
594 if (NCSAMP != nChan) {
595 error(INTERNAL, "number of color channels must remain fixed");
596 return -1;
597 }
598 // actual work is done here...
599 if (EnqueueBundle(orig_direc, accum) < 0)
600 return -1;
601
602 RcontribOutput * op; // prepare output buffers
603 for (op = outList; op; op = op->next)
604 if (!op->GetRow(rInPos))
605 return -1;
606 // convert averages & clear
607 if (lu_doall(&modLUT, putModContrib, this) < 0)
608 return -1;
609 // write buffers
610 for (op = outList; op; op = op->next)
611 op->DoneRow();
612
613 if (!nkids) // update row if solo process
614 UpdateRowsDone(rInPos++); // XXX increment here is critical
615
616 return accum;
617 }
618
619 // Get next available child, returning index or -1 if forceWait & idling
620 int
621 RcontribSimulManager::GetChild(bool forceWait)
622 {
623 if (nkids <= 0)
624 return -1;
625 // take inventory
626 int pn, n = 0;
627 fd_set writeset, errset;
628 FD_ZERO(&writeset); FD_ZERO(&errset);
629 for (pn = nkids; pn--; ) {
630 if (kidRow[pn] < 0) { // child already ready?
631 if (forceWait) continue;
632 return pn; // good enough
633 }
634 FD_SET(kid[pn].w, &writeset); // will check on this one
635 FD_SET(kid[pn].w, &errset);
636 if (kid[pn].w >= n)
637 n = kid[pn].w + 1;
638 }
639 if (!n) // every child is idle?
640 return -1;
641 // wait on "busy" child(ren)
642 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
643 if (errno != EINTR) {
644 error(SYSTEM, "select call failed in GetChild()");
645 return -1;
646 }
647 pn = -1; // get flags set by select
648 for (n = nkids; n--; )
649 if (kidRow[n] >= 0 &&
650 FD_ISSET(kid[n].w, &writeset) |
651 FD_ISSET(kid[n].w, &errset)) {
652 // update output row counts
653 UpdateRowsDone(kidRow[n]);
654 kidRow[n] = -1; // flag it available
655 pn = n;
656 }
657 return pn;
658 }
659
660 // Update row completion bitmap and update outputs (does not change rInPos)
661 bool
662 RcontribSimulManager::UpdateRowsDone(int r)
663 {
664 if (!rowsDone.TestAndSet(r)) {
665 error(WARNING, "redundant call to UpdateRowsDone()");
666 return false;
667 }
668 int nDone = GetRowFinished();
669 if (nDone <= r)
670 return true; // nothing to update, yet
671 for (RcontribOutput *op = outList; op; op = op->next)
672 if (!op->SetRowsDone(nDone))
673 return false;
674 return true; // up-to-date
675 }
676
677 // Run rcontrib child process (never returns)
678 void
679 RcontribSimulManager::RunChild()
680 {
681 FVECT * vecList = NULL;
682 RowAssignment rass;
683 ssize_t nr;
684
685 accum = 0;
686 errno = 0;
687 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
688 if (!rass.ac) {
689 error(CONSISTENCY, "bad accumulator count in child");
690 exit(1);
691 }
692 if (rass.ac > accum)
693 vecList = (FVECT *)erealloc(vecList,
694 sizeof(FVECT)*2*rass.ac);
695 accum = rass.ac;
696 rInPos = rass.row;
697
698 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
699 sizeof(FVECT)*2*accum)
700 break;
701
702 if (ComputeRecord(vecList) <= 0)
703 exit(1);
704 }
705 if (nr) {
706 error(SYSTEM, "read error in child process");
707 exit(1);
708 }
709 exit(0);
710 }
711
712 // Add child processes as indicated
713 bool
714 RcontribSimulManager::StartKids(int n2go)
715 {
716 if ((n2go <= 0) | (nkids + n2go <= 1))
717 return false;
718
719 if (!nkids)
720 cow_memshare(); // preload objects
721
722 n2go += nkids; // => desired brood size
723 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
724 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
725
726 fflush(stdout); // shouldn't use, anyway
727 while (nkids < n2go) {
728 kid[nkids] = sp_inactive;
729 kid[nkids].w = dup(1);
730 kid[nkids].flags |= PF_FILT_OUT;
731 int rv = open_process(&kid[nkids], NULL);
732 if (!rv) { // in child process?
733 while (nkids-- > 0)
734 close(kid[nkids].w);
735 free(kid); free(kidRow);
736 kid = NULL; kidRow = NULL;
737 rowsDone.NewBitMap(0);
738 RunChild(); // should never return
739 _exit(1);
740 }
741 if (rv < 0) {
742 error(SYSTEM, "cannot fork worker process");
743 return false;
744 }
745 kidRow[nkids++] = -1; // newborn is ready
746 }
747 return true;
748 }
749
750 // Reap the indicated number of children (all if 0)
751 int
752 RcontribSimulManager::StopKids(int n2end)
753 {
754 if (nkids <= 0)
755 return 0;
756 FlushQueue();
757 int status = 0;
758 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
759 status = close_processes(kid, nkids);
760 free(kid); free(kidRow);
761 kid = NULL; kidRow = NULL;
762 nkids = 0;
763 } else { // else shrink family
764 int st;
765 while (n2end-- > 0)
766 if ((st = close_process(&kid[--nkids])) > 0)
767 status = st;
768 // could call realloc(), but it hardly matters
769 }
770 if (status) {
771 sprintf(errmsg, "non-zero (%d) status from child", status);
772 error(WARNING, errmsg);
773 }
774 return status;
775 }
776
777 // Set number of computation threads (0 => #cores)
778 int
779 RcontribSimulManager::SetThreadCount(int nt)
780 {
781 if (!Ready()) {
782 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
783 return 0;
784 }
785 if (nt < 0)
786 return nkids;
787 if (!nt) nt = GetNCores();
788 int status = 0;
789 if (nt == 1)
790 status = StopKids();
791 else if (nt < nkids)
792 status = StopKids(nkids-nt);
793 else if (nt > nkids)
794 StartKids(nt-nkids);
795 if (status) {
796 sprintf(errmsg, "non-zero (%d) status from child", status);
797 error(WARNING, errmsg);
798 }
799 return nkids;
800 }