ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.7
Committed: Thu Oct 16 13:48:49 1997 UTC (26 years, 5 months ago) by gregl
Content type: text/plain
Branch: MAIN
Changes since 2.6: +2 -0 lines
Log Message:
added ifdef for INCL_SEL_H since not every system has it

File Contents

# User Rev Content
1 greg 2.1 /* Copyright (c) 1996 Regents of the University of California */
2    
3     #ifndef lint
4     static char SCCSid[] = "$SunId$ LBL";
5     #endif
6    
7     /*
8     * Parallel network process handling routines
9     */
10    
11     #include <stdio.h>
12     #include <sys/types.h>
13 gregl 2.7 #ifdef INCL_SEL_H
14 gregl 2.6 #include <sys/select.h>
15 gregl 2.7 #endif
16 greg 2.1 #include <signal.h>
17     #include <fcntl.h>
18     #include "netproc.h"
19 greg 2.5 #include "paths.h"
20 greg 2.1 #include "vfork.h"
21     /* select call compatibility stuff */
22     #ifndef FD_SETSIZE
23     #include <sys/param.h>
24     #define FD_SETSIZE NOFILE /* maximum # select file descriptors */
25     #endif
26     #ifndef FD_SET
27     #ifndef NFDBITS
28     #define NFDBITS (8*sizeof(int)) /* number of bits per fd_mask */
29     #endif
30     #define FD_SET(n, p) ((p)->fds_bits[(n)/NFDBITS] |= (1 << ((n) % NFDBITS)))
31     #define FD_CLR(n, p) ((p)->fds_bits[(n)/NFDBITS] &= ~(1 << ((n) % NFDBITS)))
32     #define FD_ISSET(n, p) ((p)->fds_bits[(n)/NFDBITS] & (1 << ((n) % NFDBITS)))
33     #ifndef BSD
34     #define bzero(d,n) (void)memset(d,0,n)
35     #endif
36     #define FD_ZERO(p) bzero((char *)(p), sizeof(*(p)))
37     #endif
38    
39     PSERVER *pslist = NULL; /* global process server list */
40    
41     static PROC *pindex[FD_SETSIZE]; /* process index table */
42    
43     static char ourhost[64]; /* this host name */
44 greg 2.5 static char ourdir[MAXPATH]; /* our working directory */
45 greg 2.1 static char ouruser[32]; /* our user name */
46     static char *ourshell; /* our user's shell */
47    
48     static fd_set errdesc; /* error file descriptors */
49     static int maxfd; /* maximum assigned descriptor */
50    
51 greg 2.3 extern char *remsh; /* externally defined remote shell program */
52    
53 greg 2.1 extern char *malloc(), *realloc();
54     extern char *getenv();
55    
56    
57     PSERVER *
58     addpserver(host, dir, usr, np) /* add a new process server */
59     char *host, *dir, *usr;
60     int np;
61     {
62     register PSERVER *ps;
63     /* allocate the struct */
64     if (np < 1)
65     return(NULL);
66     ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(PROC));
67     if (ps == NULL)
68     return(NULL);
69     if (!ourhost[0]) { /* initialize */
70 greg 2.5 char dirtmp[MAXPATH];
71 greg 2.1 register char *cp;
72     register int len;
73    
74 greg 2.2 strcpy(ourhost, myhostname());
75 greg 2.4 getwd(dirtmp);
76 greg 2.1 if ((cp = getenv("HOME")) != NULL) {
77     if (!strcmp(cp, dirtmp))
78     ourdir[0] = '\0';
79     else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
80     dirtmp[len] == '/')
81     strcpy(ourdir, dirtmp+len+1);
82     else
83     strcpy(ourdir, dirtmp);
84     } else
85     strcpy(ourdir, dirtmp);
86     if ((cp = getenv("USER")) != NULL)
87     strcpy(ouruser, cp);
88     if ((ourshell = getenv("SHELL")) == NULL)
89     ourshell = "/bin/sh";
90     FD_ZERO(&errdesc);
91     maxfd = -1;
92     }
93     /* assign host, directory, user */
94     if (host == NULL || !strcmp(host, ourhost) ||
95     !strcmp(host, LHOSTNAME))
96     ps->hostname[0] = '\0';
97     else
98     strcpy(ps->hostname, host);
99     if (dir == NULL)
100     strcpy(ps->directory, ourdir);
101     else
102     strcpy(ps->directory, dir);
103     if (usr == NULL || !strcmp(usr, ouruser))
104     ps->username[0] = '\0';
105     else
106     strcpy(ps->username, usr);
107     /* clear process slots */
108     ps->nprocs = np;
109     while (np--) {
110     ps->proc[np].com = NULL;
111     ps->proc[np].pid = -1;
112     ps->proc[np].efd = -1;
113     ps->proc[np].errs = NULL;
114     ps->proc[np].elen = 0;
115     ps->proc[np].cf = NULL;
116     }
117     /* insert in our list */
118     ps->next = pslist;
119     pslist = ps;
120     /* check for signs of life */
121     if (!pserverOK(ps)) {
122     delpserver(ps); /* failure -- abort */
123     return(NULL);
124     }
125     return(ps);
126     }
127    
128    
129     delpserver(ps) /* delete a process server */
130     PSERVER *ps;
131     {
132     PSERVER pstart;
133     register PSERVER *psp;
134     register int i;
135     /* find server in our list */
136     pstart.next = pslist;
137     for (psp = &pstart; ps != psp->next; psp = psp->next)
138     if (psp->next == NULL)
139     return; /* not in our list! */
140     /* kill any running jobs */
141     for (i = 0; i < ps->nprocs; i++)
142     if (ps->proc[i].com != NULL) {
143     kill(SIGTERM, ps->proc[i].pid);
144     wait4job(ps, ps->proc[i].pid);
145     }
146     /* remove server from list */
147     psp->next = ps->next;
148     pslist = pstart.next;
149     free((char *)ps); /* free associated memory */
150     }
151    
152    
153     PSERVER *
154     findjob(pnp) /* find out where process is running */
155     register int *pnp; /* modified */
156     {
157     register PSERVER *ps;
158     register int i;
159    
160     for (ps = pslist; ps != NULL; ps = ps->next)
161     for (i = 0; i < ps->nprocs; i++)
162     if (ps->proc[i].pid == *pnp) {
163     *pnp = i;
164     return(ps);
165     }
166     return(NULL); /* not found */
167     }
168    
169    
170     int
171     startjob(ps, command, compf) /* start a job on a process server */
172     register PSERVER *ps;
173     char *command;
174     int (*compf)();
175     {
176 greg 2.5 char udirt[MAXPATH];
177 greg 2.3 char *av[16];
178 greg 2.1 int pfd[2], pid;
179     register int i;
180    
181     if (ps == NULL) { /* find a server */
182     for (ps = pslist; ps != NULL; ps = ps->next)
183     if ((i = startjob(ps, command, compf)) != -1)
184     return(i); /* got one */
185     return(-1); /* no slots anywhere */
186     }
187     for (i = 0; i < ps->nprocs; i++)
188     if (ps->proc[i].com == NULL)
189     break;
190     if (i >= ps->nprocs)
191     return(-1); /* out of process slots */
192     /* open pipe */
193     if (pipe(pfd) < 0) {
194     perror("cannot open pipe");
195     exit(1);
196     }
197     /* start child process */
198     if ((pid = vfork()) == 0) {
199     close(pfd[0]); /* connect stderr to pipe */
200     if (pfd[1] != 2) {
201     dup2(pfd[1], 2);
202     close(pfd[1]);
203     }
204     if (ps->hostname[0]) { /* rsh command */
205 greg 2.3 av[i=0] = remsh;
206 greg 2.1 av[++i] = ps->hostname;
207 greg 2.3 av[++i] = "-n"; /* no stdin */
208     if (ps->username[0]) { /* different user */
209 greg 2.1 av[++i] = "-l";
210     av[++i] = ps->username;
211 greg 2.3 av[++i] = "cd";
212     udirt[0] = '~';
213     strcpy(udirt+1, ouruser);
214     av[++i] = udirt;
215     av[++i] = ";";
216 greg 2.1 }
217 greg 2.3 if (ps->directory[0]) { /* change directory */
218 greg 2.1 av[++i] = "cd";
219     av[++i] = ps->directory;
220     av[++i] = ";";
221     }
222     av[++i] = command;
223     av[++i] = NULL;
224     } else { /* shell command */
225     av[0] = ourshell;
226     av[1] = "-c";
227     av[2] = command;
228     av[3] = NULL;
229     }
230     execv(av[0], av);
231     _exit(1);
232     }
233     if (pid == -1) {
234     perror("fork failed");
235     exit(1);
236     }
237     ps->proc[i].com = command; /* assign process slot */
238     ps->proc[i].cf = compf;
239     ps->proc[i].pid = pid;
240     close(pfd[1]); /* get piped stderr file descriptor */
241     ps->proc[i].efd = pfd[0];
242     fcntl(pfd[0], F_SETFD, 1); /* set close on exec flag */
243     pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
244     FD_SET(pfd[0], &errdesc); /* add to select call parameter */
245     if (pfd[0] > maxfd)
246     maxfd = pfd[0];
247     return(pid); /* return to parent process */
248     }
249    
250    
251     static int
252     readerrs(fd) /* read error output from fd */
253     int fd;
254     {
255     char errbuf[BUFSIZ];
256     int nr;
257     register PROC *pp;
258     /* look up associated process */
259     if ((pp = pindex[fd]) == NULL)
260     abort(); /* serious consistency error */
261     nr = read(fd, errbuf, BUFSIZ-1);
262     if (nr < 0) {
263     perror("read error");
264     exit(1);
265     }
266     if (nr == 0) /* stream closed (process finished) */
267     return(0);
268     errbuf[nr] = '\0'; /* add to error buffer */
269     if (pp->elen == 0)
270     pp->errs = (char *)malloc(nr+1);
271     else
272     pp->errs = (char *)realloc(pp->errs, pp->elen+nr+1);
273     if (pp->errs == NULL) {
274     perror("malloc failed");
275     exit(1);
276     }
277     strcpy(pp->errs+pp->elen, errbuf);
278     pp->elen += nr;
279     return(nr);
280     }
281    
282    
283     static
284     wait4end() /* read error streams until someone is done */
285     {
286     fd_set readfds, excepfds;
287     register int i;
288     /* find end of descriptor set */
289     for ( ; maxfd >= 0; maxfd--)
290     if (FD_ISSET(maxfd, &errdesc))
291     break;
292     if (maxfd < 0)
293     return; /* nothing to read */
294     readfds = excepfds = errdesc;
295     while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
296     for (i = 0; i <= maxfd; i++) /* get pending i/o */
297     if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
298     if (readerrs(i) == 0)
299     return; /* finished process */
300     perror("select call failed");
301     exit(1);
302     }
303    
304    
305     static int
306     finishjob(ps, pn, status) /* clean up finished process */
307     PSERVER *ps;
308     int pn;
309     int status;
310     {
311     register PROC *pp;
312    
313     pp = ps->proc + pn;
314     if (pp->cf != NULL) /* client cleanup */
315     status = (*pp->cf)(ps, pn, status);
316     close(pp->efd); /* close error stream */
317     pindex[pp->efd] = NULL;
318     FD_CLR(pp->efd, &errdesc);
319     free((char *)pp->errs);
320     pp->com = NULL; /* clear settings */
321     pp->pid = -1;
322     pp->efd = -1;
323     pp->errs = NULL;
324     pp->elen = 0;
325     pp->cf = NULL;
326     return(status);
327     }
328    
329    
330     int
331     wait4job(ps, pid) /* wait for process to finish */
332     PSERVER *ps;
333     int pid;
334     {
335     int status, psn, psn2;
336     PSERVER *ps2;
337    
338     if (pid == -1) { /* wait for first job */
339     if (ps != NULL) {
340     for (psn = ps->nprocs; psn--; )
341     if (ps->proc[psn].com != NULL)
342     break;
343     if (psn < 0)
344     return(-1); /* no processes this server */
345     }
346     do {
347     wait4end(); /* wait for something to end */
348     if ((psn2 = wait(&status)) == -1)
349     return(-1); /* none left */
350     ps2 = findjob(&psn2);
351     if (ps2 != NULL) /* clean up job if ours */
352     status = finishjob(ps2, psn2, status);
353     } while (ps2 == NULL || (ps != NULL && ps2 != ps));
354     return(status); /* return job status */
355     }
356     psn = pid; /* else find specific job */
357     ps2 = findjob(&psn); /* find process slot */
358     if (ps2 == NULL || (ps != NULL && ps2 != ps))
359     return(-1); /* inconsistent target */
360     ps = ps2;
361     do {
362     wait4end(); /* wait for something to end */
363     if ((psn2 = wait(&status)) == -1)
364     return(-1); /* none left */
365     ps2 = findjob(&psn2);
366     if (ps2 != NULL) /* clean up job if ours */
367     status = finishjob(ps2, psn2, status);
368     } while (ps2 != ps || psn2 != psn);
369     return(status); /* return job status */
370     }