ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/rt/RcontribSimulManager.cpp
Revision: 2.3
Committed: Wed Oct 30 01:38:21 2024 UTC (6 months ago) by greg
Branch: MAIN
Changes since 2.2: +5 -4 lines
Log Message:
fix(rxcontrib): Fixes in file loading & process management

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: RcontribSimulManager.cpp,v 2.2 2024/10/29 19:47:19 greg Exp $";
3 #endif
4 /*
5 * RcontribSimulManager.cpp
6 *
7 * Rcontrib simulation manager implementation
8 *
9 * Created by Greg Ward on 10/17/2024.
10 */
11
12 #include <unistd.h>
13 #include <ctype.h>
14 #include "platform.h"
15 #include "selcall.h"
16 #include "RcontribSimulManager.h"
17 #include "func.h"
18 #include "resolu.h"
19 #include "source.h"
20
21 char RCCONTEXT[] = "RC.";
22
23 extern const char HDRSTR[];
24 extern const char BIGEND[];
25 extern const char FMTSTR[];
26
27 int contrib = 0; // computing contributions?
28
29 int xres = 0; // horizontal (scan) size
30 int yres = 0; // vertical resolution
31
32 // new/exclusive, overwrite if exists, or recover data
33 int RSDOflags[] = {RDSwrite|RDSexcl|RDSextend, RDSwrite|RDSextend,
34 RDSread|RDSwrite};
35
36 static const char ROWZEROSTR[] = "NROWS=0000000000000000\n";
37 #define LNROWSTR 6
38
39 // Modifier channel for recording contributions (no constructor/destructor)
40 struct RcontribMod {
41 RcontribOutput * opl; // pointer to first output channel
42 char * params; // parameters string
43 EPNODE * binv; // bin expression
44 int nbins; // bin count this modifier
45 int coffset; // column offset in bytes
46 DCOLORV cbin[1]; // bin accumulator (extends struct)
47 /// Access specific (spectral) color bin
48 DCOLORV * operator[](int n) {
49 if ((n < 0) | (n >= nbins)) return NULL;
50 return cbin + n*NCSAMP;
51 }
52 };
53
54 // Struct used to assign record calculation to child
55 struct RowAssignment {
56 uint32 row; // row to do
57 uint32 ac; // accumulation count
58 };
59
60 // Our default data share function
61 RdataShare *
62 defDataShare(const char *name, RCOutputOp op, size_t siz)
63 {
64 return new RdataShareMap(name, RSDOflags[op], siz);
65 }
66
67 // Allocate rcontrib accumulator
68 RcontribMod *
69 NewRcMod(const char *prms, const char *binexpr, int ncbins)
70 {
71 if (ncbins <= 0) return NULL;
72 if (!prms) prms = "";
73 if (!binexpr | (ncbins == 1))
74 binexpr = "0";
75
76 RcontribMod * mp = (RcontribMod *)ecalloc(1, sizeof(RcontribMod) +
77 sizeof(DCOLORV)*(NCSAMP*ncbins-1) +
78 strlen(prms)+1);
79
80 mp->params = strcpy((char *)(mp->cbin + ncbins*NCSAMP), prms);
81 mp->binv = eparse(const_cast<char *>(binexpr));
82 mp->nbins = ncbins;
83 return mp;
84 }
85
86 // Free an RcontribMod
87 void
88 FreeRcMod(void *p)
89 {
90 if (!p) return;
91 epfree((*(RcontribMod *)p).binv, true);
92 efree(p);
93 }
94
95 // Set output format ('f', 'd', or 'c')
96 bool
97 RcontribSimulManager::SetDataFormat(int ty)
98 {
99 if (outList) {
100 error(INTERNAL, "cannot call SetDataFormat() after AddModifier()");
101 return false;
102 }
103 switch (ty) {
104 case 'f':
105 dsiz = sizeof(float)*NCSAMP;
106 break;
107 case 'd':
108 dsiz = sizeof(double)*NCSAMP;
109 break;
110 case 'c':
111 dsiz = LSCOLR;
112 break;
113 default:
114 sprintf(errmsg, "unsupported output format '%c'", ty);
115 error(INTERNAL, errmsg);
116 return false;
117 }
118 dtyp = ty;
119 return true;
120 }
121
122 // static call-back for rcontrib ray-tracing
123 int
124 RcontribSimulManager::RctCall(RAY *r, void *cd)
125 {
126 if (!r->ro || r->ro->omod == OVOID) // hit nothing?
127 return 0;
128 // shadow ray not on source?
129 if (r->rsrc >= 0 && source[r->rsrc].so != r->ro)
130 return 0;
131
132 const char * mname = objptr(r->ro->omod)->oname;
133 RcontribSimulManager * rcp = (RcontribSimulManager *)cd;
134 RcontribMod * mp = (RcontribMod *)lu_find(&rcp->modLUT,mname)->data;
135 if (!mp)
136 return 0; // not in our modifier list
137
138 worldfunc(RCCONTEXT, r); // compute bin #
139 set_eparams(mp->params);
140 double bval = evalue(mp->binv);
141 if (bval <= -.5)
142 return 0; // silently ignore negative bin index
143 DCOLORV * dvp = (*mp)[int(bval + .5)];
144 if (!dvp) {
145 sprintf(errmsg, "bad bin number for '%s' (%.1f ignored)", mname, bval);
146 error(WARNING, errmsg);
147 return 0;
148 }
149 SCOLOR contr;
150 raycontrib(contr, r, PRIMARY); // compute coefficient
151 if (contrib)
152 smultscolor(contr, r->rcol); // -> value contribution
153 for (int i = 0; i < NCSAMP; i++)
154 *dvp++ += contr[i]; // accumulate color/spectrum
155 return 1;
156 }
157
158 // check for given format type in string, return last char position
159 static int
160 hasFormat(const char *tst, const char *match)
161 {
162 static const char allPoss[] = "%diouxXfFeEgGaAcsb";
163 const char * s = tst;
164 while (*s) {
165 if (*s++ != '%')
166 continue;
167 while (!strchr(allPoss, *s))
168 s++;
169 if (strchr(match, *s))
170 break;
171 s++;
172 }
173 return (*s != '\0')*(s - tst);
174 }
175
176 // Add a modifier and arguments, opening output file(s)
177 bool
178 RcontribSimulManager::AddModifier(const char *modn, const char *outspec,
179 const char *prms, const char *binval, int bincnt)
180 {
181 if (!modn | !outspec | (bincnt <= 0) || !*modn | !*outspec) {
182 error(WARNING, "ignoring bad call to AddModifier()");
183 return false;
184 }
185 if (!nChan) { // initial call?
186 if (!SetDataFormat(dtyp))
187 return false;
188 nChan = NCSAMP;
189 } else if (nChan != NCSAMP) {
190 error(INTERNAL, "number of spectral channels must be fixed");
191 return false;
192 }
193 if (Ready()) {
194 error(INTERNAL, "call to AddModifier() after PrepOutput()");
195 return false;
196 }
197 LUENT * lp = lu_find(&modLUT, modn);
198 if (lp->data) {
199 sprintf(errmsg, "duplicate modifier '%s'", modn);
200 error(USER, errmsg);
201 return false;
202 }
203 if (!lp->key) // create new entry
204 lp->key = strcpy((char *)emalloc(strlen(modn)+1), modn);
205
206 RcontribMod * mp = NewRcMod(prms, binval, bincnt);
207 if (!mp) return false;
208 lp->data = (char *)mp;
209 const int modndx = hasFormat(outspec, "sb");
210 const int binndx = hasFormat(outspec, "diouxX");
211 int bin0 = 0;
212 char fnbuf[512];
213 if (!modndx | (modndx > binndx))
214 sprintf(fnbuf, outspec, bin0, modn);
215 else
216 sprintf(fnbuf, outspec, modn, bin0);
217 RcontribOutput * olast = NULL;
218 RcontribOutput * op;
219 for (op = outList; op; op = (olast=op)->next) {
220 if (strcmp(op->GetName(), fnbuf))
221 continue;
222 if (modndx) { // this ain't right
223 sprintf(errmsg, "output name collision for '%s'", fnbuf);
224 error(USER, errmsg);
225 return false;
226 }
227 if ((binndx > 0) & (xres > 0)) {
228 sprintf(fnbuf, outspec, ++bin0);
229 continue; // each bin goes to another image
230 }
231 mp->coffset = op->rowBytes; // else add to what's there
232 break;
233 }
234 if (!op) { // create new output channel?
235 op = new RcontribOutput(fnbuf);
236 if (olast) olast->next = op;
237 else outList = op;
238 }
239 mp->opl = op; // first (maybe only) output channel
240 if (modndx) // remember modifier if part of name
241 op->omod = lp->key;
242 if (binndx > 0) { // append output image/bin list
243 op->rowBytes += dsiz;
244 op->obin = bin0;
245 for (bincnt = 1; bincnt < mp->nbins; bincnt++) {
246 if (!modndx | (modndx > binndx))
247 sprintf(fnbuf, outspec, bin0+bincnt, modn);
248 else
249 sprintf(fnbuf, outspec, modn, bin0+bincnt);
250 if (!op->next) {
251 olast = op;
252 olast->next = op = new RcontribOutput(fnbuf);
253 if (modndx) op->omod = lp->key;
254 op->obin = bin0+bincnt;
255 } else {
256 op = op->next;
257 CHECK(op->obin != bin0+bincnt, CONSISTENCY,
258 "bin number mismatch in AddModifier()");
259 }
260 CHECK(op->rowBytes != mp->coffset, CONSISTENCY,
261 "row offset mismatch in AddModifier()");
262 op->rowBytes += dsiz;
263 }
264 } else // else send all results to this channel
265 op->rowBytes += bincnt*dsiz;
266 return true;
267 }
268
269 // Add a file of modifiers with associated arguments
270 bool
271 RcontribSimulManager::AddModFile(const char *modfn, const char *outspec,
272 const char *prms,
273 const char *binval, int bincnt)
274 {
275 char * path = getpath(const_cast<char *>(modfn),
276 getrlibpath(), R_OK);
277 FILE * fp;
278 if (!path || !(fp = fopen(path, "r"))) {
279 sprintf(errmsg, "cannot %s modifier file '%s'",
280 path ? "open" : "find", modfn);
281 error(SYSTEM, errmsg);
282 return false;
283 }
284 char mod[MAXSTR];
285 while (fgetword(mod, sizeof(mod), fp))
286 if (!AddModifier(mod, outspec, prms, binval, bincnt))
287 return false;
288 fclose(fp);
289 return true;
290 }
291
292 // Prepare output channels and return # completed rows
293 int
294 RcontribSimulManager::PrepOutput()
295 {
296 if (!outList || !RtraceSimulManager::Ready()) {
297 error(INTERNAL, "PrepOutput() called before octree & modifiers assigned");
298 return -1;
299 }
300 int remWarnings = 20;
301 for (RcontribOutput *op = outList; op; op = op->next) {
302 if (op->rData) {
303 error(INTERNAL, "output channel already open in PrepOutput()");
304 return -1;
305 }
306 op->nRows = yres * (xres + !xres);
307 op->rData = (*cdsF)(op->ofname, outOp,
308 GetHeadLen()+1024 + op->nRows*op->rowBytes);
309 freeqstr(op->ofname); op->ofname = NULL;
310 if (outOp == RCOrecover) {
311 int rd = op->CheckHeader(this);
312 if (rd < 0)
313 return -1;
314 if (rd >= op->nRows) {
315 if (remWarnings >= 0) {
316 sprintf(errmsg, "recovered output '%s' already done",
317 op->GetName());
318 error(WARNING, remWarnings ? errmsg : "etc...");
319 remWarnings--;
320 }
321 rd = op->nRows;
322 }
323 if (!rInPos | (rInPos > rd)) rInPos = rd;
324 } else if (!op->NewHeader(this))
325 return -1;
326 // make sure there's room
327 if (op->rData->GetSize() < op->begData + op->nRows*op->rowBytes &&
328 !op->rData->Resize(op->begData + op->nRows*op->rowBytes))
329 return -1; // calls error() for us
330 }
331 rowsDone.NewBitMap(outList->nRows); // create row completion map
332 rowsDone.ClearBits(0, rInPos, true);
333 return rInPos;
334 }
335
336 // Create header in open write-only channel
337 bool
338 RcontribOutput::NewHeader(const RcontribSimulManager *rcp)
339 {
340 int headlim = rcp->GetHeadLen() + 1024;
341 char * hdr = (char *)rData->GetMemory(0, headlim, 0);
342 int esiz;
343 const int etyp = rcp->GetFormat(&esiz);
344
345 strcpy(hdr, HDRSTR);
346 strcat(hdr, "RADIANCE\n");
347 begData = strlen(hdr);
348 strcpy(hdr+begData, rcp->GetHeadStr());
349 begData += rcp->GetHeadLen();
350 if (omod) {
351 sprintf(hdr+begData, "MODIFIER=%s\n", omod);
352 begData += strlen(hdr+begData);
353 }
354 if (obin >= 0) {
355 sprintf(hdr+begData, "BIN=%d\n", obin);
356 begData += strlen(hdr+begData);
357 }
358 strcpy(hdr+begData, ROWZEROSTR);
359 rowCountPos = begData+LNROWSTR;
360 begData += sizeof(ROWZEROSTR)-1;
361 if (!xres | (rowBytes > esiz)) {
362 sprintf(hdr+begData, "NCOLS=%d\n", int(rowBytes/esiz));
363 begData += strlen(hdr+begData);
364 }
365 sprintf(hdr+begData, "%s%d\n", NCOMPSTR, NCSAMP);
366 begData += strlen(hdr+begData);
367 if (NCSAMP > 3) {
368 sprintf(hdr+begData, "%s %f %f %f %f\n", WLSPLTSTR,
369 WLPART[0], WLPART[1], WLPART[2], WLPART[3]);
370 begData += strlen(hdr+begData);
371 }
372 if (etyp != 'c') {
373 sprintf(hdr+begData, "%s%d\n", BIGEND, nativebigendian());
374 begData += strlen(hdr+begData);
375 }
376 int align = 0;
377 switch (etyp) {
378 case 'f':
379 align = sizeof(float);
380 break;
381 case 'd':
382 align = sizeof(double);
383 break;
384 case 'c':
385 break;
386 default:
387 error(INTERNAL, "unsupported data type in NewHeader()");
388 return false;
389 }
390 strcpy(hdr+begData, FMTSTR); // write format string
391 begData += strlen(hdr+begData);
392 strcpy(hdr+begData, formstr(etyp));
393 begData += strlen(hdr+begData);
394 if (align) // align data at end of header
395 while ((begData+2) % align)
396 hdr[begData++] = ' ';
397 hdr[begData++] = '\n'; // EOL for data format
398 hdr[begData++] = '\n'; // end of nominal header
399 if ((xres > 0) & (rowBytes == esiz)) { // tack on resolution string?
400 sprintf(hdr+begData, PIXSTDFMT, yres, xres);
401 begData += strlen(hdr+begData);
402 }
403 return rData->ReleaseMemory(hdr, RDSwrite);
404 }
405
406 // find string argument in header for named variable
407 static const char *
408 findArgs(const char *hdr, const char *vnm, int len)
409 {
410 const char * npos = strnstr(hdr, vnm, len);
411
412 if (!npos) return NULL;
413
414 npos += strlen(vnm); // find start of (first) argument
415 len -= npos - hdr;
416 while (len-- > 0 && (*npos == '=') | isspace(*npos))
417 if (*npos++ == '\n')
418 return NULL;
419
420 return (len >= 0) ? npos : NULL;
421 }
422
423 // Load and check header in read/write channel
424 int
425 RcontribOutput::CheckHeader(const RcontribSimulManager *rcp)
426 {
427 int esiz;
428 const int etyp = rcp->GetFormat(&esiz);
429 const int maxlen = rcp->GetHeadLen() + 1024;
430 char * hdr = (char *)rData->GetMemory(0, maxlen, RDSread);
431 const char * cp;
432 // find end of header
433 if (!hdr || !(cp = strnstr(hdr, "\n\n", maxlen))) {
434 sprintf(errmsg, "cannot find end of header in '%s'", GetName());
435 error(USER, errmsg);
436 return -1;
437 }
438 begData = cp - hdr + 1; // increment again at end
439 // check # components
440 if (((cp = findArgs(hdr, NCOMPSTR, begData)) ? atoi(cp) : 3) != NCSAMP) {
441 sprintf(errmsg, "expected %s%d in '%s'", NCOMPSTR, NCSAMP, GetName());
442 error(USER, errmsg);
443 return -1;
444 }
445 // check format
446 if (!(cp = findArgs(hdr, FMTSTR, begData)) ||
447 strncmp(cp, formstr(etyp), strlen(formstr(etyp)))) {
448 sprintf(errmsg, "expected %s%s in '%s'", FMTSTR, formstr(etyp), GetName());
449 error(USER, errmsg);
450 return -1;
451 }
452 // check #columns
453 if (((cp = findArgs(hdr, "NCOLS=", begData)) && atoi(cp)*esiz != rowBytes) ||
454 !xres | (rowBytes > esiz)) {
455 sprintf(errmsg, "expected NCOLS=%d in '%s'",
456 int(rowBytes/esiz), GetName());
457 error(USER, errmsg);
458 return -1;
459 }
460 // find row count
461 if (!(cp = findArgs(hdr, "NROWS=", begData))) {
462 sprintf(errmsg, "missing NROWS in '%s'", GetName());
463 error(USER, errmsg);
464 return -1;
465 }
466 rowCountPos = cp - hdr;
467 int rlast = atoi(cp);
468 begData++; // advance past closing EOL
469 if ((xres > 0) & (rowBytes == esiz)) { // check/skip resolution string?
470 char rbuf[64];
471 sprintf(rbuf, PIXSTDFMT, yres, xres);
472 int rlen = strlen(rbuf);
473 if (strncmp(rbuf, hdr+begData, rlen)) {
474 sprintf(errmsg, "bad resolution string in '%s'", GetName());
475 error(USER, errmsg);
476 return -1;
477 }
478 begData += rlen;
479 }
480 // XXX assume the rest is OK: endianness, wavelength splits, modifier, bin
481 return rData->ReleaseMemory(hdr, 0) ? rlast : -1;
482 }
483
484 // Rewind calculation (previous results unchanged)
485 bool
486 RcontribSimulManager::ResetRow(int r)
487 {
488 if (!rowsDone.Length() | (0 > r) | (r >= rInPos)) {
489 error(WARNING, "ignoring bad call to ResetRow()");
490 return (r == rInPos);
491 }
492 FlushQueue(); // finish current and reset
493 for (RcontribOutput *op = outList; op; op = op->next)
494 if (!op->SetRowsDone(r))
495 return false;
496
497 rowsDone.ClearBits(r, rInPos-r, false);
498 rInPos = r;
499 return true;
500 }
501
502 // call-back for averaging each modifier's contributions to assigned channel(s)
503 static int
504 putModContrib(const LUENT *lp, void *p)
505 {
506 RcontribMod * mp = (RcontribMod *)lp->data;
507 const RcontribSimulManager * rcp = (const RcontribSimulManager *)p;
508 const double sca = 1./rcp->accum;
509 const DCOLORV * dvp = mp->cbin;
510 RcontribOutput * op = mp->opl;
511 int i, n;
512
513 switch (rcp->GetFormat()) { // conversion based on output type
514 case 'd': {
515 double * dvo = (double *)op->InsertionP(mp->coffset);
516 for (n = mp->nbins; n--; ) {
517 for (i = 0; i < NCSAMP; i++)
518 *dvo++ = *dvp++ * sca;
519 if ((op->obin >= 0) & (n > 0))
520 dvo = (double *)(op = op->Next())->InsertionP(mp->coffset);
521 }
522 } break;
523 case 'f': {
524 float * fvo = (float *)op->InsertionP(mp->coffset);
525 for (n = mp->nbins; n--; ) {
526 for (i = 0; i < NCSAMP; i++)
527 *fvo++ = float(*dvp++ * sca);
528 if ((op->obin >= 0) & (n > 0))
529 fvo = (float *)(op = op->Next())->InsertionP(mp->coffset);
530 }
531 } break;
532 case 'c': {
533 COLRV * cvo = (COLRV *)op->InsertionP(mp->coffset);
534 for (n = mp->nbins; n--; ) {
535 SCOLOR scol;
536 for (i = 0; i < NCSAMP; i++)
537 scol[i] = COLORV(*dvp++ * sca);
538 scolor_scolr(cvo, scol);
539 cvo += LSCOLR;
540 if ((op->obin >= 0) & (n > 0))
541 cvo = (COLRV *)(op = op->Next())->InsertionP(mp->coffset);
542 }
543 } break;
544 default:
545 error(CONSISTENCY, "unsupported output type in sendModContrib()");
546 return -1;
547 }
548 // clear for next tally
549 memset(mp->cbin, 0, sizeof(DCOLORV)*NCSAMP*mp->nbins);
550 return 1;
551 }
552
553 // Add a ray/bundle to compute next record
554 int
555 RcontribSimulManager::ComputeRecord(const FVECT orig_direc[])
556 {
557 if (!Ready())
558 return 0;
559 if (rInPos >= outList->nRows) {
560 error(WARNING, "ComputeRecord() called after last record");
561 return 0;
562 }
563 if (nkids > 0) { // in parent process?
564 int k = GetChild(); // updates output rows
565 if (k < 0) return -1; // can't really happen
566 RowAssignment rass;
567 rass.row = kidRow[k] = rInPos++;
568 rass.ac = accum;
569 if (write(kid[k].w, &rass, sizeof(rass)) != sizeof(rass) ||
570 writebuf(kid[k].w, orig_direc, sizeof(FVECT)*2*accum)
571 != sizeof(FVECT)*2*accum) {
572 error(SYSTEM, "cannot write to child; dead process?");
573 return -1;
574 }
575 return accum; // tracing/output happens in child
576 }
577 if (NCSAMP != nChan) {
578 error(INTERNAL, "number of color channels must remain fixed");
579 return -1;
580 }
581 // actual work is done here...
582 if (EnqueueBundle(orig_direc, accum) < 0)
583 return -1;
584
585 RcontribOutput * op; // prepare output buffers
586 for (op = outList; op; op = op->next)
587 if (!op->GetRow(rInPos))
588 return -1;
589 // convert averages & clear
590 if (lu_doall(&modLUT, putModContrib, this) < 0)
591 return -1;
592 // write buffers
593 for (op = outList; op; op = op->next)
594 op->DoneRow();
595
596 if (!nkids) // update row if solo process
597 UpdateRowsDone(rInPos++); // XXX increment here is critical
598
599 return accum;
600 }
601
602 // Get next available child, returning index or -1 if forceWait & idling
603 int
604 RcontribSimulManager::GetChild(bool forceWait)
605 {
606 if (nkids <= 0)
607 return -1;
608 // take inventory
609 int pn, n = 0;
610 fd_set writeset, errset;
611 FD_ZERO(&writeset); FD_ZERO(&errset);
612 for (pn = nkids; pn--; ) {
613 if (kidRow[pn] < 0) { // child already ready?
614 if (forceWait) continue;
615 return pn; // good enough
616 }
617 FD_SET(kid[pn].w, &writeset); // will check on this one
618 FD_SET(kid[pn].w, &errset);
619 if (kid[pn].w >= n)
620 n = kid[pn].w + 1;
621 }
622 if (!n) // every child is idle?
623 return -1;
624 // wait on "busy" child(ren)
625 while ((n = select(n, NULL, &writeset, &errset, NULL)) <= 0)
626 if (errno != EINTR) {
627 error(SYSTEM, "select call failed in GetChild()");
628 return -1;
629 }
630 pn = -1; // get flags set by select
631 for (n = nkids; n--; )
632 if (kidRow[n] >= 0 &&
633 FD_ISSET(kid[n].w, &writeset) |
634 FD_ISSET(kid[n].w, &errset)) {
635 // update output row counts
636 if (!FD_ISSET(kid[n].w, &errset))
637 UpdateRowsDone(kidRow[n]);
638 kidRow[n] = -1; // flag it available
639 pn = n;
640 }
641 return pn;
642 }
643
644 // Update row completion bitmap and update outputs (does not change rInPos)
645 bool
646 RcontribSimulManager::UpdateRowsDone(int r)
647 {
648 if (!rowsDone.TestAndSet(r)) {
649 error(WARNING, "redundant call to UpdateRowsDone()");
650 return false;
651 }
652 int nDone = GetRowFinished();
653 if (nDone <= r)
654 return true; // nothing to update, yet
655 for (RcontribOutput *op = outList; op; op = op->next)
656 if (!op->SetRowsDone(nDone))
657 return false;
658 return true; // up-to-date
659 }
660
661 // Run rcontrib child process (never returns)
662 void
663 RcontribSimulManager::RunChild()
664 {
665 FVECT * vecList = NULL;
666 RowAssignment rass;
667 ssize_t nr;
668
669 accum = 0;
670 errno = 0;
671 while ((nr = read(0, &rass, sizeof(rass))) == sizeof(rass)) {
672 if (!rass.ac) {
673 error(CONSISTENCY, "bad accumulator count in child");
674 exit(1);
675 }
676 if (rass.ac > accum)
677 vecList = (FVECT *)erealloc(vecList,
678 sizeof(FVECT)*2*rass.ac);
679 accum = rass.ac;
680 rInPos = rass.row;
681
682 if (readbuf(0, vecList, sizeof(FVECT)*2*accum) !=
683 sizeof(FVECT)*2*accum)
684 break;
685
686 if (ComputeRecord(vecList) <= 0)
687 exit(1);
688 }
689 if (nr) {
690 error(SYSTEM, "read error in child process");
691 exit(1);
692 }
693 exit(0);
694 }
695
696 // Add child processes as indicated
697 bool
698 RcontribSimulManager::StartKids(int n2go)
699 {
700 if ((n2go <= 0) | (nkids + n2go <= 1))
701 return false;
702
703 if (!nkids)
704 cow_memshare(); // preload objects
705
706 n2go += nkids; // => desired brood size
707 kid = (SUBPROC *)erealloc(kid, sizeof(SUBPROC)*n2go);
708 kidRow = (int32 *)erealloc(kidRow, sizeof(int32)*n2go);
709
710 fflush(stdout); // shouldn't use, anyway
711 while (nkids < n2go) {
712 kid[nkids] = sp_inactive;
713 kid[nkids].w = dup(1);
714 kid[nkids].flags |= PF_FILT_OUT;
715 int rv = open_process(&kid[nkids], NULL);
716 if (!rv) { // in child process?
717 while (nkids-- > 0)
718 close(kid[nkids].w);
719 free(kid); free(kidRow);
720 kid = NULL; kidRow = NULL;
721 RunChild(); // should never return
722 _exit(1);
723 }
724 if (rv < 0) {
725 error(SYSTEM, "cannot fork worker process");
726 return false;
727 }
728 kidRow[nkids++] = -1; // newborn is ready
729 }
730 return true;
731 }
732
733 // Reap the indicated number of children (all if 0)
734 int
735 RcontribSimulManager::StopKids(int n2end)
736 {
737 if (nkids <= 0)
738 return 0;
739 FlushQueue();
740 int status = 0;
741 if (!n2end | (n2end >= nkids)) { // end all subprocesses?
742 status = close_processes(kid, nkids);
743 free(kid); free(kidRow);
744 kid = NULL; kidRow = NULL;
745 nkids = 0;
746 } else { // else shrink family
747 int st;
748 while (n2end-- > 0)
749 if ((st = close_process(&kid[--nkids])) > 0)
750 status = st;
751 // could call realloc(), but it hardly matters
752 }
753 if (status) {
754 sprintf(errmsg, "non-zero (%d) status from child", status);
755 error(WARNING, errmsg);
756 }
757 return status;
758 }
759
760 // Set number of computation threads (0 => #cores)
761 int
762 RcontribSimulManager::SetThreadCount(int nt)
763 {
764 if (!Ready()) {
765 error(INTERNAL, "must call PrepOutput() before SetThreadCount()");
766 return 0;
767 }
768 if (nt < 0)
769 return nkids;
770 if (!nt) nt = GetNCores();
771 int status = 0;
772 if (nt == 1)
773 status = StopKids();
774 else if (nt < nkids)
775 status = StopKids(nkids-nt);
776 else if (nt > nkids)
777 StartKids(nt-nkids);
778 if (status) {
779 sprintf(errmsg, "non-zero (%d) status from child", status);
780 error(WARNING, errmsg);
781 }
782 return nkids;
783 }