ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.14
Committed: Tue Nov 11 16:24:06 2003 UTC (20 years, 4 months ago) by greg
Content type: text/plain
Branch: MAIN
Changes since 2.13: +2 -2 lines
Log Message:
Replaced all calls to vfork() with regular fork() calls

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: netproc.c,v 2.13 2003/11/10 16:52:24 greg Exp $";
3 #endif
4 /*
5 * Parallel network process handling routines
6 */
7
8 #include <stdlib.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <signal.h>
12 #include <fcntl.h>
13 #include <unistd.h>
14
15 #include "selcall.h"
16 #include "netproc.h"
17 #include "paths.h"
18
19 PSERVER *pslist = NULL; /* global process server list */
20
21 static NETPROC *pindex[FD_SETSIZE]; /* process index table */
22
23 static char ourhost[64]; /* this host name */
24 static char ourdir[PATH_MAX]; /* our working directory */
25 static char ouruser[32]; /* our user name */
26 static char *ourshell; /* our user's shell */
27
28 static fd_set errdesc; /* error file descriptors */
29 static int maxfd; /* maximum assigned descriptor */
30
31 extern char *remsh; /* externally defined remote shell program */
32
33
34 PSERVER *
35 addpserver(host, dir, usr, np) /* add a new process server */
36 char *host, *dir, *usr;
37 int np;
38 {
39 register PSERVER *ps;
40 /* allocate the struct */
41 if (np < 1)
42 return(NULL);
43 ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(NETPROC));
44 if (ps == NULL)
45 return(NULL);
46 if (!ourhost[0]) { /* initialize */
47 char dirtmp[PATH_MAX];
48 register char *cp;
49 register int len;
50
51 strcpy(ourhost, myhostname());
52 getcwd(dirtmp, sizeof(dirtmp));
53 if ((cp = getenv("HOME")) != NULL) {
54 if (!strcmp(cp, dirtmp))
55 ourdir[0] = '\0';
56 else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
57 dirtmp[len] == '/')
58 strcpy(ourdir, dirtmp+len+1);
59 else
60 strcpy(ourdir, dirtmp);
61 } else
62 strcpy(ourdir, dirtmp);
63 if ((cp = getenv("USER")) != NULL)
64 strcpy(ouruser, cp);
65 if ((ourshell = getenv("SHELL")) == NULL)
66 ourshell = "/bin/sh";
67 FD_ZERO(&errdesc);
68 maxfd = -1;
69 }
70 /* assign host, directory, user */
71 if (host == NULL || !strcmp(host, ourhost) ||
72 !strcmp(host, LHOSTNAME))
73 ps->hostname[0] = '\0';
74 else
75 strcpy(ps->hostname, host);
76 if (dir == NULL)
77 strcpy(ps->directory, ourdir);
78 else
79 strcpy(ps->directory, dir);
80 if (usr == NULL || !strcmp(usr, ouruser))
81 ps->username[0] = '\0';
82 else
83 strcpy(ps->username, usr);
84 /* clear process slots */
85 ps->nprocs = np;
86 while (np--) {
87 ps->proc[np].com = NULL;
88 ps->proc[np].pid = -1;
89 ps->proc[np].efd = -1;
90 ps->proc[np].errs = NULL;
91 ps->proc[np].elen = 0;
92 ps->proc[np].cf = NULL;
93 }
94 /* insert in our list */
95 ps->next = pslist;
96 pslist = ps;
97 /* check for signs of life */
98 if (!pserverOK(ps)) {
99 delpserver(ps); /* failure -- abort */
100 return(NULL);
101 }
102 return(ps);
103 }
104
105
106 delpserver(ps) /* delete a process server */
107 PSERVER *ps;
108 {
109 PSERVER pstart;
110 register PSERVER *psp;
111 register int i;
112 /* find server in our list */
113 pstart.next = pslist;
114 for (psp = &pstart; ps != psp->next; psp = psp->next)
115 if (psp->next == NULL)
116 return; /* not in our list! */
117 /* kill any running jobs */
118 for (i = 0; i < ps->nprocs; i++)
119 if (ps->proc[i].com != NULL) {
120 kill(SIGTERM, ps->proc[i].pid);
121 wait4job(ps, ps->proc[i].pid);
122 }
123 /* remove server from list */
124 psp->next = ps->next;
125 pslist = pstart.next;
126 free((void *)ps); /* free associated memory */
127 }
128
129
130 PSERVER *
131 findjob(pnp) /* find out where process is running */
132 register int *pnp; /* modified */
133 {
134 register PSERVER *ps;
135 register int i;
136
137 for (ps = pslist; ps != NULL; ps = ps->next)
138 for (i = 0; i < ps->nprocs; i++)
139 if (ps->proc[i].pid == *pnp) {
140 *pnp = i;
141 return(ps);
142 }
143 return(NULL); /* not found */
144 }
145
146
147 int
148 startjob(ps, command, compf) /* start a job on a process server */
149 register PSERVER *ps;
150 char *command;
151 int (*compf)();
152 {
153 char udirt[PATH_MAX];
154 char *av[16];
155 int pfd[2], pid;
156 register int i;
157
158 if (ps == NULL) { /* find a server */
159 for (ps = pslist; ps != NULL; ps = ps->next)
160 if ((i = startjob(ps, command, compf)) != -1)
161 return(i); /* got one */
162 return(-1); /* no slots anywhere */
163 }
164 for (i = 0; i < ps->nprocs; i++)
165 if (ps->proc[i].com == NULL)
166 break;
167 if (i >= ps->nprocs)
168 return(-1); /* out of process slots */
169 /* open pipe */
170 if (pipe(pfd) < 0) {
171 perror("cannot open pipe");
172 exit(1);
173 }
174 /* start child process */
175 if ((pid = fork()) == 0) {
176 close(pfd[0]); /* connect stderr to pipe */
177 if (pfd[1] != 2) {
178 dup2(pfd[1], 2);
179 close(pfd[1]);
180 }
181 if (ps->hostname[0]) { /* rsh command */
182 av[i=0] = remsh;
183 av[++i] = ps->hostname;
184 av[++i] = "-n"; /* no stdin */
185 if (ps->username[0]) { /* different user */
186 av[++i] = "-l";
187 av[++i] = ps->username;
188 av[++i] = "cd";
189 udirt[0] = '~';
190 strcpy(udirt+1, ouruser);
191 av[++i] = udirt;
192 av[++i] = ";";
193 }
194 if (ps->directory[0]) { /* change directory */
195 av[++i] = "cd";
196 av[++i] = ps->directory;
197 av[++i] = ";";
198 }
199 av[++i] = command;
200 av[++i] = NULL;
201 } else { /* shell command */
202 av[0] = ourshell;
203 av[1] = "-c";
204 av[2] = command;
205 av[3] = NULL;
206 }
207 execv(av[0], av);
208 _exit(1);
209 }
210 if (pid == -1) {
211 perror("fork failed");
212 exit(1);
213 }
214 ps->proc[i].com = command; /* assign process slot */
215 ps->proc[i].cf = compf;
216 ps->proc[i].pid = pid;
217 close(pfd[1]); /* get piped stderr file descriptor */
218 ps->proc[i].efd = pfd[0];
219 fcntl(pfd[0], F_SETFD, 1); /* set close on exec flag */
220 pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
221 FD_SET(pfd[0], &errdesc); /* add to select call parameter */
222 if (pfd[0] > maxfd)
223 maxfd = pfd[0];
224 return(pid); /* return to parent process */
225 }
226
227
228 static int
229 readerrs(fd) /* read error output from fd */
230 int fd;
231 {
232 char errbuf[BUFSIZ];
233 int nr;
234 register NETPROC *pp;
235 /* look up associated process */
236 if ((pp = pindex[fd]) == NULL)
237 abort(); /* serious consistency error */
238 nr = read(fd, errbuf, BUFSIZ-1);
239 if (nr < 0) {
240 perror("read error");
241 exit(1);
242 }
243 if (nr == 0) /* stream closed (process finished) */
244 return(0);
245 errbuf[nr] = '\0'; /* add to error buffer */
246 if (pp->elen == 0)
247 pp->errs = (char *)malloc(nr+1);
248 else
249 pp->errs = (char *)realloc((void *)pp->errs, pp->elen+nr+1);
250 if (pp->errs == NULL) {
251 perror("malloc failed");
252 exit(1);
253 }
254 strcpy(pp->errs+pp->elen, errbuf);
255 pp->elen += nr;
256 return(nr);
257 }
258
259
260 static
261 wait4end() /* read error streams until someone is done */
262 {
263 fd_set readfds, excepfds;
264 register int i;
265 /* find end of descriptor set */
266 for ( ; maxfd >= 0; maxfd--)
267 if (FD_ISSET(maxfd, &errdesc))
268 break;
269 if (maxfd < 0)
270 return; /* nothing to read */
271 readfds = excepfds = errdesc;
272 while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
273 for (i = 0; i <= maxfd; i++) /* get pending i/o */
274 if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
275 if (readerrs(i) == 0)
276 return; /* finished process */
277 perror("select call failed");
278 exit(1);
279 }
280
281
282 static int
283 finishjob(ps, pn, status) /* clean up finished process */
284 PSERVER *ps;
285 int pn;
286 int status;
287 {
288 register NETPROC *pp;
289
290 pp = ps->proc + pn;
291 if (pp->cf != NULL) /* client cleanup */
292 status = (*pp->cf)(ps, pn, status);
293 close(pp->efd); /* close error stream */
294 pindex[pp->efd] = NULL;
295 FD_CLR(pp->efd, &errdesc);
296 free((void *)pp->errs);
297 pp->com = NULL; /* clear settings */
298 pp->pid = -1;
299 pp->efd = -1;
300 pp->errs = NULL;
301 pp->elen = 0;
302 pp->cf = NULL;
303 return(status);
304 }
305
306
307 int
308 wait4job(ps, pid) /* wait for process to finish */
309 PSERVER *ps;
310 int pid;
311 {
312 int status, psn, psn2;
313 PSERVER *ps2;
314
315 if (pid == -1) { /* wait for first job */
316 if (ps != NULL) {
317 for (psn = ps->nprocs; psn--; )
318 if (ps->proc[psn].com != NULL)
319 break;
320 if (psn < 0)
321 return(-1); /* no processes this server */
322 }
323 do {
324 wait4end(); /* wait for something to end */
325 if ((psn2 = wait(&status)) == -1)
326 return(-1); /* none left */
327 ps2 = findjob(&psn2);
328 if (ps2 != NULL) /* clean up job if ours */
329 status = finishjob(ps2, psn2, status);
330 } while (ps2 == NULL || (ps != NULL && ps2 != ps));
331 return(status); /* return job status */
332 }
333 psn = pid; /* else find specific job */
334 ps2 = findjob(&psn); /* find process slot */
335 if (ps2 == NULL || (ps != NULL && ps2 != ps))
336 return(-1); /* inconsistent target */
337 ps = ps2;
338 do {
339 wait4end(); /* wait for something to end */
340 if ((psn2 = wait(&status)) == -1)
341 return(-1); /* none left */
342 ps2 = findjob(&psn2);
343 if (ps2 != NULL) /* clean up job if ours */
344 status = finishjob(ps2, psn2, status);
345 } while (ps2 != ps || psn2 != psn);
346 return(status); /* return job status */
347 }