ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.3
Committed: Tue Feb 20 15:55:01 1996 UTC (28 years, 1 month ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.2: +13 -15 lines
Log Message:
added RSH variable to rename or replace remote shell execution

File Contents

# Content
1 /* Copyright (c) 1996 Regents of the University of California */
2
3 #ifndef lint
4 static char SCCSid[] = "$SunId$ LBL";
5 #endif
6
7 /*
8 * Parallel network process handling routines
9 */
10
11 #include <stdio.h>
12 #include <sys/types.h>
13 #include <signal.h>
14 #include <fcntl.h>
15 #include "netproc.h"
16 #include "vfork.h"
17 /* select call compatibility stuff */
18 #ifndef FD_SETSIZE
19 #include <sys/param.h>
20 #define FD_SETSIZE NOFILE /* maximum # select file descriptors */
21 #endif
22 #ifndef FD_SET
23 #ifndef NFDBITS
24 #define NFDBITS (8*sizeof(int)) /* number of bits per fd_mask */
25 #endif
26 #define FD_SET(n, p) ((p)->fds_bits[(n)/NFDBITS] |= (1 << ((n) % NFDBITS)))
27 #define FD_CLR(n, p) ((p)->fds_bits[(n)/NFDBITS] &= ~(1 << ((n) % NFDBITS)))
28 #define FD_ISSET(n, p) ((p)->fds_bits[(n)/NFDBITS] & (1 << ((n) % NFDBITS)))
29 #ifndef BSD
30 #define bzero(d,n) (void)memset(d,0,n)
31 #endif
32 #define FD_ZERO(p) bzero((char *)(p), sizeof(*(p)))
33 #endif
34
35 PSERVER *pslist = NULL; /* global process server list */
36
37 static PROC *pindex[FD_SETSIZE]; /* process index table */
38
39 static char ourhost[64]; /* this host name */
40 static char ourdir[128]; /* our working directory */
41 static char ouruser[32]; /* our user name */
42 static char *ourshell; /* our user's shell */
43
44 static fd_set errdesc; /* error file descriptors */
45 static int maxfd; /* maximum assigned descriptor */
46
47 extern char *remsh; /* externally defined remote shell program */
48
49 extern char *malloc(), *realloc();
50 extern char *getenv();
51
52
53 PSERVER *
54 addpserver(host, dir, usr, np) /* add a new process server */
55 char *host, *dir, *usr;
56 int np;
57 {
58 register PSERVER *ps;
59 /* allocate the struct */
60 if (np < 1)
61 return(NULL);
62 ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(PROC));
63 if (ps == NULL)
64 return(NULL);
65 if (!ourhost[0]) { /* initialize */
66 char dirtmp[128];
67 register char *cp;
68 register int len;
69
70 strcpy(ourhost, myhostname());
71 getcwd(dirtmp, sizeof(dirtmp));
72 if ((cp = getenv("HOME")) != NULL) {
73 if (!strcmp(cp, dirtmp))
74 ourdir[0] = '\0';
75 else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
76 dirtmp[len] == '/')
77 strcpy(ourdir, dirtmp+len+1);
78 else
79 strcpy(ourdir, dirtmp);
80 } else
81 strcpy(ourdir, dirtmp);
82 if ((cp = getenv("USER")) != NULL)
83 strcpy(ouruser, cp);
84 if ((ourshell = getenv("SHELL")) == NULL)
85 ourshell = "/bin/sh";
86 FD_ZERO(&errdesc);
87 maxfd = -1;
88 }
89 /* assign host, directory, user */
90 if (host == NULL || !strcmp(host, ourhost) ||
91 !strcmp(host, LHOSTNAME))
92 ps->hostname[0] = '\0';
93 else
94 strcpy(ps->hostname, host);
95 if (dir == NULL)
96 strcpy(ps->directory, ourdir);
97 else
98 strcpy(ps->directory, dir);
99 if (usr == NULL || !strcmp(usr, ouruser))
100 ps->username[0] = '\0';
101 else
102 strcpy(ps->username, usr);
103 /* clear process slots */
104 ps->nprocs = np;
105 while (np--) {
106 ps->proc[np].com = NULL;
107 ps->proc[np].pid = -1;
108 ps->proc[np].efd = -1;
109 ps->proc[np].errs = NULL;
110 ps->proc[np].elen = 0;
111 ps->proc[np].cf = NULL;
112 }
113 /* insert in our list */
114 ps->next = pslist;
115 pslist = ps;
116 /* check for signs of life */
117 if (!pserverOK(ps)) {
118 delpserver(ps); /* failure -- abort */
119 return(NULL);
120 }
121 return(ps);
122 }
123
124
125 delpserver(ps) /* delete a process server */
126 PSERVER *ps;
127 {
128 PSERVER pstart;
129 register PSERVER *psp;
130 register int i;
131 /* find server in our list */
132 pstart.next = pslist;
133 for (psp = &pstart; ps != psp->next; psp = psp->next)
134 if (psp->next == NULL)
135 return; /* not in our list! */
136 /* kill any running jobs */
137 for (i = 0; i < ps->nprocs; i++)
138 if (ps->proc[i].com != NULL) {
139 kill(SIGTERM, ps->proc[i].pid);
140 wait4job(ps, ps->proc[i].pid);
141 }
142 /* remove server from list */
143 psp->next = ps->next;
144 pslist = pstart.next;
145 free((char *)ps); /* free associated memory */
146 }
147
148
149 PSERVER *
150 findjob(pnp) /* find out where process is running */
151 register int *pnp; /* modified */
152 {
153 register PSERVER *ps;
154 register int i;
155
156 for (ps = pslist; ps != NULL; ps = ps->next)
157 for (i = 0; i < ps->nprocs; i++)
158 if (ps->proc[i].pid == *pnp) {
159 *pnp = i;
160 return(ps);
161 }
162 return(NULL); /* not found */
163 }
164
165
166 int
167 startjob(ps, command, compf) /* start a job on a process server */
168 register PSERVER *ps;
169 char *command;
170 int (*compf)();
171 {
172 char udirt[128];
173 char *av[16];
174 int pfd[2], pid;
175 register int i;
176
177 if (ps == NULL) { /* find a server */
178 for (ps = pslist; ps != NULL; ps = ps->next)
179 if ((i = startjob(ps, command, compf)) != -1)
180 return(i); /* got one */
181 return(-1); /* no slots anywhere */
182 }
183 for (i = 0; i < ps->nprocs; i++)
184 if (ps->proc[i].com == NULL)
185 break;
186 if (i >= ps->nprocs)
187 return(-1); /* out of process slots */
188 /* open pipe */
189 if (pipe(pfd) < 0) {
190 perror("cannot open pipe");
191 exit(1);
192 }
193 /* start child process */
194 if ((pid = vfork()) == 0) {
195 close(pfd[0]); /* connect stderr to pipe */
196 if (pfd[1] != 2) {
197 dup2(pfd[1], 2);
198 close(pfd[1]);
199 }
200 if (ps->hostname[0]) { /* rsh command */
201 av[i=0] = remsh;
202 av[++i] = ps->hostname;
203 av[++i] = "-n"; /* no stdin */
204 if (ps->username[0]) { /* different user */
205 av[++i] = "-l";
206 av[++i] = ps->username;
207 av[++i] = "cd";
208 udirt[0] = '~';
209 strcpy(udirt+1, ouruser);
210 av[++i] = udirt;
211 av[++i] = ";";
212 }
213 if (ps->directory[0]) { /* change directory */
214 av[++i] = "cd";
215 av[++i] = ps->directory;
216 av[++i] = ";";
217 }
218 av[++i] = command;
219 av[++i] = NULL;
220 } else { /* shell command */
221 av[0] = ourshell;
222 av[1] = "-c";
223 av[2] = command;
224 av[3] = NULL;
225 }
226 execv(av[0], av);
227 _exit(1);
228 }
229 if (pid == -1) {
230 perror("fork failed");
231 exit(1);
232 }
233 ps->proc[i].com = command; /* assign process slot */
234 ps->proc[i].cf = compf;
235 ps->proc[i].pid = pid;
236 close(pfd[1]); /* get piped stderr file descriptor */
237 ps->proc[i].efd = pfd[0];
238 fcntl(pfd[0], F_SETFD, 1); /* set close on exec flag */
239 pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
240 FD_SET(pfd[0], &errdesc); /* add to select call parameter */
241 if (pfd[0] > maxfd)
242 maxfd = pfd[0];
243 return(pid); /* return to parent process */
244 }
245
246
247 static int
248 readerrs(fd) /* read error output from fd */
249 int fd;
250 {
251 char errbuf[BUFSIZ];
252 int nr;
253 register PROC *pp;
254 /* look up associated process */
255 if ((pp = pindex[fd]) == NULL)
256 abort(); /* serious consistency error */
257 nr = read(fd, errbuf, BUFSIZ-1);
258 if (nr < 0) {
259 perror("read error");
260 exit(1);
261 }
262 if (nr == 0) /* stream closed (process finished) */
263 return(0);
264 errbuf[nr] = '\0'; /* add to error buffer */
265 if (pp->elen == 0)
266 pp->errs = (char *)malloc(nr+1);
267 else
268 pp->errs = (char *)realloc(pp->errs, pp->elen+nr+1);
269 if (pp->errs == NULL) {
270 perror("malloc failed");
271 exit(1);
272 }
273 strcpy(pp->errs+pp->elen, errbuf);
274 pp->elen += nr;
275 return(nr);
276 }
277
278
279 static
280 wait4end() /* read error streams until someone is done */
281 {
282 fd_set readfds, excepfds;
283 register int i;
284 /* find end of descriptor set */
285 for ( ; maxfd >= 0; maxfd--)
286 if (FD_ISSET(maxfd, &errdesc))
287 break;
288 if (maxfd < 0)
289 return; /* nothing to read */
290 readfds = excepfds = errdesc;
291 while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
292 for (i = 0; i <= maxfd; i++) /* get pending i/o */
293 if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
294 if (readerrs(i) == 0)
295 return; /* finished process */
296 perror("select call failed");
297 exit(1);
298 }
299
300
301 static int
302 finishjob(ps, pn, status) /* clean up finished process */
303 PSERVER *ps;
304 int pn;
305 int status;
306 {
307 register PROC *pp;
308
309 pp = ps->proc + pn;
310 if (pp->cf != NULL) /* client cleanup */
311 status = (*pp->cf)(ps, pn, status);
312 close(pp->efd); /* close error stream */
313 pindex[pp->efd] = NULL;
314 FD_CLR(pp->efd, &errdesc);
315 free((char *)pp->errs);
316 pp->com = NULL; /* clear settings */
317 pp->pid = -1;
318 pp->efd = -1;
319 pp->errs = NULL;
320 pp->elen = 0;
321 pp->cf = NULL;
322 return(status);
323 }
324
325
326 int
327 wait4job(ps, pid) /* wait for process to finish */
328 PSERVER *ps;
329 int pid;
330 {
331 int status, psn, psn2;
332 PSERVER *ps2;
333
334 if (pid == -1) { /* wait for first job */
335 if (ps != NULL) {
336 for (psn = ps->nprocs; psn--; )
337 if (ps->proc[psn].com != NULL)
338 break;
339 if (psn < 0)
340 return(-1); /* no processes this server */
341 }
342 do {
343 wait4end(); /* wait for something to end */
344 if ((psn2 = wait(&status)) == -1)
345 return(-1); /* none left */
346 ps2 = findjob(&psn2);
347 if (ps2 != NULL) /* clean up job if ours */
348 status = finishjob(ps2, psn2, status);
349 } while (ps2 == NULL || (ps != NULL && ps2 != ps));
350 return(status); /* return job status */
351 }
352 psn = pid; /* else find specific job */
353 ps2 = findjob(&psn); /* find process slot */
354 if (ps2 == NULL || (ps != NULL && ps2 != ps))
355 return(-1); /* inconsistent target */
356 ps = ps2;
357 do {
358 wait4end(); /* wait for something to end */
359 if ((psn2 = wait(&status)) == -1)
360 return(-1); /* none left */
361 ps2 = findjob(&psn2);
362 if (ps2 != NULL) /* clean up job if ours */
363 status = finishjob(ps2, psn2, status);
364 } while (ps2 != ps || psn2 != psn);
365 return(status); /* return job status */
366 }