ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.8
Committed: Tue Oct 28 14:01:39 1997 UTC (26 years, 6 months ago) by gregl
Content type: text/plain
Branch: MAIN
Changes since 2.7: +1 -21 lines
Log Message:
created selcall.h include file for select(2) compatibility

File Contents

# Content
1 /* Copyright (c) 1996 Regents of the University of California */
2
3 #ifndef lint
4 static char SCCSid[] = "$SunId$ LBL";
5 #endif
6
7 /*
8 * Parallel network process handling routines
9 */
10
11 #include <stdio.h>
12 #include <signal.h>
13 #include <fcntl.h>
14 #include "selcall.h"
15 #include "netproc.h"
16 #include "paths.h"
17 #include "vfork.h"
18
19 PSERVER *pslist = NULL; /* global process server list */
20
21 static PROC *pindex[FD_SETSIZE]; /* process index table */
22
23 static char ourhost[64]; /* this host name */
24 static char ourdir[MAXPATH]; /* our working directory */
25 static char ouruser[32]; /* our user name */
26 static char *ourshell; /* our user's shell */
27
28 static fd_set errdesc; /* error file descriptors */
29 static int maxfd; /* maximum assigned descriptor */
30
31 extern char *remsh; /* externally defined remote shell program */
32
33 extern char *malloc(), *realloc();
34 extern char *getenv();
35
36
37 PSERVER *
38 addpserver(host, dir, usr, np) /* add a new process server */
39 char *host, *dir, *usr;
40 int np;
41 {
42 register PSERVER *ps;
43 /* allocate the struct */
44 if (np < 1)
45 return(NULL);
46 ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(PROC));
47 if (ps == NULL)
48 return(NULL);
49 if (!ourhost[0]) { /* initialize */
50 char dirtmp[MAXPATH];
51 register char *cp;
52 register int len;
53
54 strcpy(ourhost, myhostname());
55 getwd(dirtmp);
56 if ((cp = getenv("HOME")) != NULL) {
57 if (!strcmp(cp, dirtmp))
58 ourdir[0] = '\0';
59 else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
60 dirtmp[len] == '/')
61 strcpy(ourdir, dirtmp+len+1);
62 else
63 strcpy(ourdir, dirtmp);
64 } else
65 strcpy(ourdir, dirtmp);
66 if ((cp = getenv("USER")) != NULL)
67 strcpy(ouruser, cp);
68 if ((ourshell = getenv("SHELL")) == NULL)
69 ourshell = "/bin/sh";
70 FD_ZERO(&errdesc);
71 maxfd = -1;
72 }
73 /* assign host, directory, user */
74 if (host == NULL || !strcmp(host, ourhost) ||
75 !strcmp(host, LHOSTNAME))
76 ps->hostname[0] = '\0';
77 else
78 strcpy(ps->hostname, host);
79 if (dir == NULL)
80 strcpy(ps->directory, ourdir);
81 else
82 strcpy(ps->directory, dir);
83 if (usr == NULL || !strcmp(usr, ouruser))
84 ps->username[0] = '\0';
85 else
86 strcpy(ps->username, usr);
87 /* clear process slots */
88 ps->nprocs = np;
89 while (np--) {
90 ps->proc[np].com = NULL;
91 ps->proc[np].pid = -1;
92 ps->proc[np].efd = -1;
93 ps->proc[np].errs = NULL;
94 ps->proc[np].elen = 0;
95 ps->proc[np].cf = NULL;
96 }
97 /* insert in our list */
98 ps->next = pslist;
99 pslist = ps;
100 /* check for signs of life */
101 if (!pserverOK(ps)) {
102 delpserver(ps); /* failure -- abort */
103 return(NULL);
104 }
105 return(ps);
106 }
107
108
109 delpserver(ps) /* delete a process server */
110 PSERVER *ps;
111 {
112 PSERVER pstart;
113 register PSERVER *psp;
114 register int i;
115 /* find server in our list */
116 pstart.next = pslist;
117 for (psp = &pstart; ps != psp->next; psp = psp->next)
118 if (psp->next == NULL)
119 return; /* not in our list! */
120 /* kill any running jobs */
121 for (i = 0; i < ps->nprocs; i++)
122 if (ps->proc[i].com != NULL) {
123 kill(SIGTERM, ps->proc[i].pid);
124 wait4job(ps, ps->proc[i].pid);
125 }
126 /* remove server from list */
127 psp->next = ps->next;
128 pslist = pstart.next;
129 free((char *)ps); /* free associated memory */
130 }
131
132
133 PSERVER *
134 findjob(pnp) /* find out where process is running */
135 register int *pnp; /* modified */
136 {
137 register PSERVER *ps;
138 register int i;
139
140 for (ps = pslist; ps != NULL; ps = ps->next)
141 for (i = 0; i < ps->nprocs; i++)
142 if (ps->proc[i].pid == *pnp) {
143 *pnp = i;
144 return(ps);
145 }
146 return(NULL); /* not found */
147 }
148
149
150 int
151 startjob(ps, command, compf) /* start a job on a process server */
152 register PSERVER *ps;
153 char *command;
154 int (*compf)();
155 {
156 char udirt[MAXPATH];
157 char *av[16];
158 int pfd[2], pid;
159 register int i;
160
161 if (ps == NULL) { /* find a server */
162 for (ps = pslist; ps != NULL; ps = ps->next)
163 if ((i = startjob(ps, command, compf)) != -1)
164 return(i); /* got one */
165 return(-1); /* no slots anywhere */
166 }
167 for (i = 0; i < ps->nprocs; i++)
168 if (ps->proc[i].com == NULL)
169 break;
170 if (i >= ps->nprocs)
171 return(-1); /* out of process slots */
172 /* open pipe */
173 if (pipe(pfd) < 0) {
174 perror("cannot open pipe");
175 exit(1);
176 }
177 /* start child process */
178 if ((pid = vfork()) == 0) {
179 close(pfd[0]); /* connect stderr to pipe */
180 if (pfd[1] != 2) {
181 dup2(pfd[1], 2);
182 close(pfd[1]);
183 }
184 if (ps->hostname[0]) { /* rsh command */
185 av[i=0] = remsh;
186 av[++i] = ps->hostname;
187 av[++i] = "-n"; /* no stdin */
188 if (ps->username[0]) { /* different user */
189 av[++i] = "-l";
190 av[++i] = ps->username;
191 av[++i] = "cd";
192 udirt[0] = '~';
193 strcpy(udirt+1, ouruser);
194 av[++i] = udirt;
195 av[++i] = ";";
196 }
197 if (ps->directory[0]) { /* change directory */
198 av[++i] = "cd";
199 av[++i] = ps->directory;
200 av[++i] = ";";
201 }
202 av[++i] = command;
203 av[++i] = NULL;
204 } else { /* shell command */
205 av[0] = ourshell;
206 av[1] = "-c";
207 av[2] = command;
208 av[3] = NULL;
209 }
210 execv(av[0], av);
211 _exit(1);
212 }
213 if (pid == -1) {
214 perror("fork failed");
215 exit(1);
216 }
217 ps->proc[i].com = command; /* assign process slot */
218 ps->proc[i].cf = compf;
219 ps->proc[i].pid = pid;
220 close(pfd[1]); /* get piped stderr file descriptor */
221 ps->proc[i].efd = pfd[0];
222 fcntl(pfd[0], F_SETFD, 1); /* set close on exec flag */
223 pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
224 FD_SET(pfd[0], &errdesc); /* add to select call parameter */
225 if (pfd[0] > maxfd)
226 maxfd = pfd[0];
227 return(pid); /* return to parent process */
228 }
229
230
231 static int
232 readerrs(fd) /* read error output from fd */
233 int fd;
234 {
235 char errbuf[BUFSIZ];
236 int nr;
237 register PROC *pp;
238 /* look up associated process */
239 if ((pp = pindex[fd]) == NULL)
240 abort(); /* serious consistency error */
241 nr = read(fd, errbuf, BUFSIZ-1);
242 if (nr < 0) {
243 perror("read error");
244 exit(1);
245 }
246 if (nr == 0) /* stream closed (process finished) */
247 return(0);
248 errbuf[nr] = '\0'; /* add to error buffer */
249 if (pp->elen == 0)
250 pp->errs = (char *)malloc(nr+1);
251 else
252 pp->errs = (char *)realloc(pp->errs, pp->elen+nr+1);
253 if (pp->errs == NULL) {
254 perror("malloc failed");
255 exit(1);
256 }
257 strcpy(pp->errs+pp->elen, errbuf);
258 pp->elen += nr;
259 return(nr);
260 }
261
262
263 static
264 wait4end() /* read error streams until someone is done */
265 {
266 fd_set readfds, excepfds;
267 register int i;
268 /* find end of descriptor set */
269 for ( ; maxfd >= 0; maxfd--)
270 if (FD_ISSET(maxfd, &errdesc))
271 break;
272 if (maxfd < 0)
273 return; /* nothing to read */
274 readfds = excepfds = errdesc;
275 while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
276 for (i = 0; i <= maxfd; i++) /* get pending i/o */
277 if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
278 if (readerrs(i) == 0)
279 return; /* finished process */
280 perror("select call failed");
281 exit(1);
282 }
283
284
285 static int
286 finishjob(ps, pn, status) /* clean up finished process */
287 PSERVER *ps;
288 int pn;
289 int status;
290 {
291 register PROC *pp;
292
293 pp = ps->proc + pn;
294 if (pp->cf != NULL) /* client cleanup */
295 status = (*pp->cf)(ps, pn, status);
296 close(pp->efd); /* close error stream */
297 pindex[pp->efd] = NULL;
298 FD_CLR(pp->efd, &errdesc);
299 free((char *)pp->errs);
300 pp->com = NULL; /* clear settings */
301 pp->pid = -1;
302 pp->efd = -1;
303 pp->errs = NULL;
304 pp->elen = 0;
305 pp->cf = NULL;
306 return(status);
307 }
308
309
310 int
311 wait4job(ps, pid) /* wait for process to finish */
312 PSERVER *ps;
313 int pid;
314 {
315 int status, psn, psn2;
316 PSERVER *ps2;
317
318 if (pid == -1) { /* wait for first job */
319 if (ps != NULL) {
320 for (psn = ps->nprocs; psn--; )
321 if (ps->proc[psn].com != NULL)
322 break;
323 if (psn < 0)
324 return(-1); /* no processes this server */
325 }
326 do {
327 wait4end(); /* wait for something to end */
328 if ((psn2 = wait(&status)) == -1)
329 return(-1); /* none left */
330 ps2 = findjob(&psn2);
331 if (ps2 != NULL) /* clean up job if ours */
332 status = finishjob(ps2, psn2, status);
333 } while (ps2 == NULL || (ps != NULL && ps2 != ps));
334 return(status); /* return job status */
335 }
336 psn = pid; /* else find specific job */
337 ps2 = findjob(&psn); /* find process slot */
338 if (ps2 == NULL || (ps != NULL && ps2 != ps))
339 return(-1); /* inconsistent target */
340 ps = ps2;
341 do {
342 wait4end(); /* wait for something to end */
343 if ((psn2 = wait(&status)) == -1)
344 return(-1); /* none left */
345 ps2 = findjob(&psn2);
346 if (ps2 != NULL) /* clean up job if ours */
347 status = finishjob(ps2, psn2, status);
348 } while (ps2 != ps || psn2 != psn);
349 return(status); /* return job status */
350 }