ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.6
Committed: Thu Sep 18 16:10:19 1997 UTC (26 years, 7 months ago) by gregl
Content type: text/plain
Branch: MAIN
Changes since 2.5: +1 -0 lines
Log Message:
added include of sys/select.h for AIX machines

File Contents

# Content
1 /* Copyright (c) 1996 Regents of the University of California */
2
3 #ifndef lint
4 static char SCCSid[] = "$SunId$ LBL";
5 #endif
6
7 /*
8 * Parallel network process handling routines
9 */
10
11 #include <stdio.h>
12 #include <sys/types.h>
13 #include <sys/select.h>
14 #include <signal.h>
15 #include <fcntl.h>
16 #include "netproc.h"
17 #include "paths.h"
18 #include "vfork.h"
19 /* select call compatibility stuff */
20 #ifndef FD_SETSIZE
21 #include <sys/param.h>
22 #define FD_SETSIZE NOFILE /* maximum # select file descriptors */
23 #endif
24 #ifndef FD_SET
25 #ifndef NFDBITS
26 #define NFDBITS (8*sizeof(int)) /* number of bits per fd_mask */
27 #endif
28 #define FD_SET(n, p) ((p)->fds_bits[(n)/NFDBITS] |= (1 << ((n) % NFDBITS)))
29 #define FD_CLR(n, p) ((p)->fds_bits[(n)/NFDBITS] &= ~(1 << ((n) % NFDBITS)))
30 #define FD_ISSET(n, p) ((p)->fds_bits[(n)/NFDBITS] & (1 << ((n) % NFDBITS)))
31 #ifndef BSD
32 #define bzero(d,n) (void)memset(d,0,n)
33 #endif
34 #define FD_ZERO(p) bzero((char *)(p), sizeof(*(p)))
35 #endif
36
37 PSERVER *pslist = NULL; /* global process server list */
38
39 static PROC *pindex[FD_SETSIZE]; /* process index table */
40
41 static char ourhost[64]; /* this host name */
42 static char ourdir[MAXPATH]; /* our working directory */
43 static char ouruser[32]; /* our user name */
44 static char *ourshell; /* our user's shell */
45
46 static fd_set errdesc; /* error file descriptors */
47 static int maxfd; /* maximum assigned descriptor */
48
49 extern char *remsh; /* externally defined remote shell program */
50
51 extern char *malloc(), *realloc();
52 extern char *getenv();
53
54
55 PSERVER *
56 addpserver(host, dir, usr, np) /* add a new process server */
57 char *host, *dir, *usr;
58 int np;
59 {
60 register PSERVER *ps;
61 /* allocate the struct */
62 if (np < 1)
63 return(NULL);
64 ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(PROC));
65 if (ps == NULL)
66 return(NULL);
67 if (!ourhost[0]) { /* initialize */
68 char dirtmp[MAXPATH];
69 register char *cp;
70 register int len;
71
72 strcpy(ourhost, myhostname());
73 getwd(dirtmp);
74 if ((cp = getenv("HOME")) != NULL) {
75 if (!strcmp(cp, dirtmp))
76 ourdir[0] = '\0';
77 else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
78 dirtmp[len] == '/')
79 strcpy(ourdir, dirtmp+len+1);
80 else
81 strcpy(ourdir, dirtmp);
82 } else
83 strcpy(ourdir, dirtmp);
84 if ((cp = getenv("USER")) != NULL)
85 strcpy(ouruser, cp);
86 if ((ourshell = getenv("SHELL")) == NULL)
87 ourshell = "/bin/sh";
88 FD_ZERO(&errdesc);
89 maxfd = -1;
90 }
91 /* assign host, directory, user */
92 if (host == NULL || !strcmp(host, ourhost) ||
93 !strcmp(host, LHOSTNAME))
94 ps->hostname[0] = '\0';
95 else
96 strcpy(ps->hostname, host);
97 if (dir == NULL)
98 strcpy(ps->directory, ourdir);
99 else
100 strcpy(ps->directory, dir);
101 if (usr == NULL || !strcmp(usr, ouruser))
102 ps->username[0] = '\0';
103 else
104 strcpy(ps->username, usr);
105 /* clear process slots */
106 ps->nprocs = np;
107 while (np--) {
108 ps->proc[np].com = NULL;
109 ps->proc[np].pid = -1;
110 ps->proc[np].efd = -1;
111 ps->proc[np].errs = NULL;
112 ps->proc[np].elen = 0;
113 ps->proc[np].cf = NULL;
114 }
115 /* insert in our list */
116 ps->next = pslist;
117 pslist = ps;
118 /* check for signs of life */
119 if (!pserverOK(ps)) {
120 delpserver(ps); /* failure -- abort */
121 return(NULL);
122 }
123 return(ps);
124 }
125
126
127 delpserver(ps) /* delete a process server */
128 PSERVER *ps;
129 {
130 PSERVER pstart;
131 register PSERVER *psp;
132 register int i;
133 /* find server in our list */
134 pstart.next = pslist;
135 for (psp = &pstart; ps != psp->next; psp = psp->next)
136 if (psp->next == NULL)
137 return; /* not in our list! */
138 /* kill any running jobs */
139 for (i = 0; i < ps->nprocs; i++)
140 if (ps->proc[i].com != NULL) {
141 kill(SIGTERM, ps->proc[i].pid);
142 wait4job(ps, ps->proc[i].pid);
143 }
144 /* remove server from list */
145 psp->next = ps->next;
146 pslist = pstart.next;
147 free((char *)ps); /* free associated memory */
148 }
149
150
151 PSERVER *
152 findjob(pnp) /* find out where process is running */
153 register int *pnp; /* modified */
154 {
155 register PSERVER *ps;
156 register int i;
157
158 for (ps = pslist; ps != NULL; ps = ps->next)
159 for (i = 0; i < ps->nprocs; i++)
160 if (ps->proc[i].pid == *pnp) {
161 *pnp = i;
162 return(ps);
163 }
164 return(NULL); /* not found */
165 }
166
167
168 int
169 startjob(ps, command, compf) /* start a job on a process server */
170 register PSERVER *ps;
171 char *command;
172 int (*compf)();
173 {
174 char udirt[MAXPATH];
175 char *av[16];
176 int pfd[2], pid;
177 register int i;
178
179 if (ps == NULL) { /* find a server */
180 for (ps = pslist; ps != NULL; ps = ps->next)
181 if ((i = startjob(ps, command, compf)) != -1)
182 return(i); /* got one */
183 return(-1); /* no slots anywhere */
184 }
185 for (i = 0; i < ps->nprocs; i++)
186 if (ps->proc[i].com == NULL)
187 break;
188 if (i >= ps->nprocs)
189 return(-1); /* out of process slots */
190 /* open pipe */
191 if (pipe(pfd) < 0) {
192 perror("cannot open pipe");
193 exit(1);
194 }
195 /* start child process */
196 if ((pid = vfork()) == 0) {
197 close(pfd[0]); /* connect stderr to pipe */
198 if (pfd[1] != 2) {
199 dup2(pfd[1], 2);
200 close(pfd[1]);
201 }
202 if (ps->hostname[0]) { /* rsh command */
203 av[i=0] = remsh;
204 av[++i] = ps->hostname;
205 av[++i] = "-n"; /* no stdin */
206 if (ps->username[0]) { /* different user */
207 av[++i] = "-l";
208 av[++i] = ps->username;
209 av[++i] = "cd";
210 udirt[0] = '~';
211 strcpy(udirt+1, ouruser);
212 av[++i] = udirt;
213 av[++i] = ";";
214 }
215 if (ps->directory[0]) { /* change directory */
216 av[++i] = "cd";
217 av[++i] = ps->directory;
218 av[++i] = ";";
219 }
220 av[++i] = command;
221 av[++i] = NULL;
222 } else { /* shell command */
223 av[0] = ourshell;
224 av[1] = "-c";
225 av[2] = command;
226 av[3] = NULL;
227 }
228 execv(av[0], av);
229 _exit(1);
230 }
231 if (pid == -1) {
232 perror("fork failed");
233 exit(1);
234 }
235 ps->proc[i].com = command; /* assign process slot */
236 ps->proc[i].cf = compf;
237 ps->proc[i].pid = pid;
238 close(pfd[1]); /* get piped stderr file descriptor */
239 ps->proc[i].efd = pfd[0];
240 fcntl(pfd[0], F_SETFD, 1); /* set close on exec flag */
241 pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
242 FD_SET(pfd[0], &errdesc); /* add to select call parameter */
243 if (pfd[0] > maxfd)
244 maxfd = pfd[0];
245 return(pid); /* return to parent process */
246 }
247
248
249 static int
250 readerrs(fd) /* read error output from fd */
251 int fd;
252 {
253 char errbuf[BUFSIZ];
254 int nr;
255 register PROC *pp;
256 /* look up associated process */
257 if ((pp = pindex[fd]) == NULL)
258 abort(); /* serious consistency error */
259 nr = read(fd, errbuf, BUFSIZ-1);
260 if (nr < 0) {
261 perror("read error");
262 exit(1);
263 }
264 if (nr == 0) /* stream closed (process finished) */
265 return(0);
266 errbuf[nr] = '\0'; /* add to error buffer */
267 if (pp->elen == 0)
268 pp->errs = (char *)malloc(nr+1);
269 else
270 pp->errs = (char *)realloc(pp->errs, pp->elen+nr+1);
271 if (pp->errs == NULL) {
272 perror("malloc failed");
273 exit(1);
274 }
275 strcpy(pp->errs+pp->elen, errbuf);
276 pp->elen += nr;
277 return(nr);
278 }
279
280
281 static
282 wait4end() /* read error streams until someone is done */
283 {
284 fd_set readfds, excepfds;
285 register int i;
286 /* find end of descriptor set */
287 for ( ; maxfd >= 0; maxfd--)
288 if (FD_ISSET(maxfd, &errdesc))
289 break;
290 if (maxfd < 0)
291 return; /* nothing to read */
292 readfds = excepfds = errdesc;
293 while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
294 for (i = 0; i <= maxfd; i++) /* get pending i/o */
295 if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
296 if (readerrs(i) == 0)
297 return; /* finished process */
298 perror("select call failed");
299 exit(1);
300 }
301
302
303 static int
304 finishjob(ps, pn, status) /* clean up finished process */
305 PSERVER *ps;
306 int pn;
307 int status;
308 {
309 register PROC *pp;
310
311 pp = ps->proc + pn;
312 if (pp->cf != NULL) /* client cleanup */
313 status = (*pp->cf)(ps, pn, status);
314 close(pp->efd); /* close error stream */
315 pindex[pp->efd] = NULL;
316 FD_CLR(pp->efd, &errdesc);
317 free((char *)pp->errs);
318 pp->com = NULL; /* clear settings */
319 pp->pid = -1;
320 pp->efd = -1;
321 pp->errs = NULL;
322 pp->elen = 0;
323 pp->cf = NULL;
324 return(status);
325 }
326
327
328 int
329 wait4job(ps, pid) /* wait for process to finish */
330 PSERVER *ps;
331 int pid;
332 {
333 int status, psn, psn2;
334 PSERVER *ps2;
335
336 if (pid == -1) { /* wait for first job */
337 if (ps != NULL) {
338 for (psn = ps->nprocs; psn--; )
339 if (ps->proc[psn].com != NULL)
340 break;
341 if (psn < 0)
342 return(-1); /* no processes this server */
343 }
344 do {
345 wait4end(); /* wait for something to end */
346 if ((psn2 = wait(&status)) == -1)
347 return(-1); /* none left */
348 ps2 = findjob(&psn2);
349 if (ps2 != NULL) /* clean up job if ours */
350 status = finishjob(ps2, psn2, status);
351 } while (ps2 == NULL || (ps != NULL && ps2 != ps));
352 return(status); /* return job status */
353 }
354 psn = pid; /* else find specific job */
355 ps2 = findjob(&psn); /* find process slot */
356 if (ps2 == NULL || (ps != NULL && ps2 != ps))
357 return(-1); /* inconsistent target */
358 ps = ps2;
359 do {
360 wait4end(); /* wait for something to end */
361 if ((psn2 = wait(&status)) == -1)
362 return(-1); /* none left */
363 ps2 = findjob(&psn2);
364 if (ps2 != NULL) /* clean up job if ours */
365 status = finishjob(ps2, psn2, status);
366 } while (ps2 != ps || psn2 != psn);
367 return(status); /* return job status */
368 }