ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.9
Committed: Sat Feb 22 02:07:30 2003 UTC (21 years, 1 month ago) by greg
Content type: text/plain
Branch: MAIN
CVS Tags: rad3R5
Changes since 2.8: +3 -7 lines
Log Message:
Changes and check-in for 3.5 release
Includes new source files and modifications not recorded for many years
See ray/doc/notes/ReleaseNotes for notes between 3.1 and 3.5 release

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 greg 2.9 static const char RCSid[] = "$Id$";
3 greg 2.1 #endif
4     /*
5     * Parallel network process handling routines
6     */
7    
8     #include <stdio.h>
9     #include <signal.h>
10     #include <fcntl.h>
11 gregl 2.8 #include "selcall.h"
12 greg 2.1 #include "netproc.h"
13 greg 2.5 #include "paths.h"
14 greg 2.1 #include "vfork.h"
15    
16     PSERVER *pslist = NULL; /* global process server list */
17    
18     static PROC *pindex[FD_SETSIZE]; /* process index table */
19    
20     static char ourhost[64]; /* this host name */
21 greg 2.5 static char ourdir[MAXPATH]; /* our working directory */
22 greg 2.1 static char ouruser[32]; /* our user name */
23     static char *ourshell; /* our user's shell */
24    
25     static fd_set errdesc; /* error file descriptors */
26     static int maxfd; /* maximum assigned descriptor */
27    
28 greg 2.3 extern char *remsh; /* externally defined remote shell program */
29    
30 greg 2.1 extern char *getenv();
31    
32    
33     PSERVER *
34     addpserver(host, dir, usr, np) /* add a new process server */
35     char *host, *dir, *usr;
36     int np;
37     {
38     register PSERVER *ps;
39     /* allocate the struct */
40     if (np < 1)
41     return(NULL);
42     ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(PROC));
43     if (ps == NULL)
44     return(NULL);
45     if (!ourhost[0]) { /* initialize */
46 greg 2.5 char dirtmp[MAXPATH];
47 greg 2.1 register char *cp;
48     register int len;
49    
50 greg 2.2 strcpy(ourhost, myhostname());
51 greg 2.4 getwd(dirtmp);
52 greg 2.1 if ((cp = getenv("HOME")) != NULL) {
53     if (!strcmp(cp, dirtmp))
54     ourdir[0] = '\0';
55     else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
56     dirtmp[len] == '/')
57     strcpy(ourdir, dirtmp+len+1);
58     else
59     strcpy(ourdir, dirtmp);
60     } else
61     strcpy(ourdir, dirtmp);
62     if ((cp = getenv("USER")) != NULL)
63     strcpy(ouruser, cp);
64     if ((ourshell = getenv("SHELL")) == NULL)
65     ourshell = "/bin/sh";
66     FD_ZERO(&errdesc);
67     maxfd = -1;
68     }
69     /* assign host, directory, user */
70     if (host == NULL || !strcmp(host, ourhost) ||
71     !strcmp(host, LHOSTNAME))
72     ps->hostname[0] = '\0';
73     else
74     strcpy(ps->hostname, host);
75     if (dir == NULL)
76     strcpy(ps->directory, ourdir);
77     else
78     strcpy(ps->directory, dir);
79     if (usr == NULL || !strcmp(usr, ouruser))
80     ps->username[0] = '\0';
81     else
82     strcpy(ps->username, usr);
83     /* clear process slots */
84     ps->nprocs = np;
85     while (np--) {
86     ps->proc[np].com = NULL;
87     ps->proc[np].pid = -1;
88     ps->proc[np].efd = -1;
89     ps->proc[np].errs = NULL;
90     ps->proc[np].elen = 0;
91     ps->proc[np].cf = NULL;
92     }
93     /* insert in our list */
94     ps->next = pslist;
95     pslist = ps;
96     /* check for signs of life */
97     if (!pserverOK(ps)) {
98     delpserver(ps); /* failure -- abort */
99     return(NULL);
100     }
101     return(ps);
102     }
103    
104    
105     delpserver(ps) /* delete a process server */
106     PSERVER *ps;
107     {
108     PSERVER pstart;
109     register PSERVER *psp;
110     register int i;
111     /* find server in our list */
112     pstart.next = pslist;
113     for (psp = &pstart; ps != psp->next; psp = psp->next)
114     if (psp->next == NULL)
115     return; /* not in our list! */
116     /* kill any running jobs */
117     for (i = 0; i < ps->nprocs; i++)
118     if (ps->proc[i].com != NULL) {
119     kill(SIGTERM, ps->proc[i].pid);
120     wait4job(ps, ps->proc[i].pid);
121     }
122     /* remove server from list */
123     psp->next = ps->next;
124     pslist = pstart.next;
125 greg 2.9 free((void *)ps); /* free associated memory */
126 greg 2.1 }
127    
128    
129     PSERVER *
130     findjob(pnp) /* find out where process is running */
131     register int *pnp; /* modified */
132     {
133     register PSERVER *ps;
134     register int i;
135    
136     for (ps = pslist; ps != NULL; ps = ps->next)
137     for (i = 0; i < ps->nprocs; i++)
138     if (ps->proc[i].pid == *pnp) {
139     *pnp = i;
140     return(ps);
141     }
142     return(NULL); /* not found */
143     }
144    
145    
146     int
147     startjob(ps, command, compf) /* start a job on a process server */
148     register PSERVER *ps;
149     char *command;
150     int (*compf)();
151     {
152 greg 2.5 char udirt[MAXPATH];
153 greg 2.3 char *av[16];
154 greg 2.1 int pfd[2], pid;
155     register int i;
156    
157     if (ps == NULL) { /* find a server */
158     for (ps = pslist; ps != NULL; ps = ps->next)
159     if ((i = startjob(ps, command, compf)) != -1)
160     return(i); /* got one */
161     return(-1); /* no slots anywhere */
162     }
163     for (i = 0; i < ps->nprocs; i++)
164     if (ps->proc[i].com == NULL)
165     break;
166     if (i >= ps->nprocs)
167     return(-1); /* out of process slots */
168     /* open pipe */
169     if (pipe(pfd) < 0) {
170     perror("cannot open pipe");
171     exit(1);
172     }
173     /* start child process */
174     if ((pid = vfork()) == 0) {
175     close(pfd[0]); /* connect stderr to pipe */
176     if (pfd[1] != 2) {
177     dup2(pfd[1], 2);
178     close(pfd[1]);
179     }
180     if (ps->hostname[0]) { /* rsh command */
181 greg 2.3 av[i=0] = remsh;
182 greg 2.1 av[++i] = ps->hostname;
183 greg 2.3 av[++i] = "-n"; /* no stdin */
184     if (ps->username[0]) { /* different user */
185 greg 2.1 av[++i] = "-l";
186     av[++i] = ps->username;
187 greg 2.3 av[++i] = "cd";
188     udirt[0] = '~';
189     strcpy(udirt+1, ouruser);
190     av[++i] = udirt;
191     av[++i] = ";";
192 greg 2.1 }
193 greg 2.3 if (ps->directory[0]) { /* change directory */
194 greg 2.1 av[++i] = "cd";
195     av[++i] = ps->directory;
196     av[++i] = ";";
197     }
198     av[++i] = command;
199     av[++i] = NULL;
200     } else { /* shell command */
201     av[0] = ourshell;
202     av[1] = "-c";
203     av[2] = command;
204     av[3] = NULL;
205     }
206     execv(av[0], av);
207     _exit(1);
208     }
209     if (pid == -1) {
210     perror("fork failed");
211     exit(1);
212     }
213     ps->proc[i].com = command; /* assign process slot */
214     ps->proc[i].cf = compf;
215     ps->proc[i].pid = pid;
216     close(pfd[1]); /* get piped stderr file descriptor */
217     ps->proc[i].efd = pfd[0];
218     fcntl(pfd[0], F_SETFD, 1); /* set close on exec flag */
219     pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
220     FD_SET(pfd[0], &errdesc); /* add to select call parameter */
221     if (pfd[0] > maxfd)
222     maxfd = pfd[0];
223     return(pid); /* return to parent process */
224     }
225    
226    
227     static int
228     readerrs(fd) /* read error output from fd */
229     int fd;
230     {
231     char errbuf[BUFSIZ];
232     int nr;
233     register PROC *pp;
234     /* look up associated process */
235     if ((pp = pindex[fd]) == NULL)
236     abort(); /* serious consistency error */
237     nr = read(fd, errbuf, BUFSIZ-1);
238     if (nr < 0) {
239     perror("read error");
240     exit(1);
241     }
242     if (nr == 0) /* stream closed (process finished) */
243     return(0);
244     errbuf[nr] = '\0'; /* add to error buffer */
245     if (pp->elen == 0)
246     pp->errs = (char *)malloc(nr+1);
247     else
248     pp->errs = (char *)realloc(pp->errs, pp->elen+nr+1);
249     if (pp->errs == NULL) {
250     perror("malloc failed");
251     exit(1);
252     }
253     strcpy(pp->errs+pp->elen, errbuf);
254     pp->elen += nr;
255     return(nr);
256     }
257    
258    
259     static
260     wait4end() /* read error streams until someone is done */
261     {
262     fd_set readfds, excepfds;
263     register int i;
264     /* find end of descriptor set */
265     for ( ; maxfd >= 0; maxfd--)
266     if (FD_ISSET(maxfd, &errdesc))
267     break;
268     if (maxfd < 0)
269     return; /* nothing to read */
270     readfds = excepfds = errdesc;
271     while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
272     for (i = 0; i <= maxfd; i++) /* get pending i/o */
273     if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
274     if (readerrs(i) == 0)
275     return; /* finished process */
276     perror("select call failed");
277     exit(1);
278     }
279    
280    
281     static int
282     finishjob(ps, pn, status) /* clean up finished process */
283     PSERVER *ps;
284     int pn;
285     int status;
286     {
287     register PROC *pp;
288    
289     pp = ps->proc + pn;
290     if (pp->cf != NULL) /* client cleanup */
291     status = (*pp->cf)(ps, pn, status);
292     close(pp->efd); /* close error stream */
293     pindex[pp->efd] = NULL;
294     FD_CLR(pp->efd, &errdesc);
295 greg 2.9 free((void *)pp->errs);
296 greg 2.1 pp->com = NULL; /* clear settings */
297     pp->pid = -1;
298     pp->efd = -1;
299     pp->errs = NULL;
300     pp->elen = 0;
301     pp->cf = NULL;
302     return(status);
303     }
304    
305    
306     int
307     wait4job(ps, pid) /* wait for process to finish */
308     PSERVER *ps;
309     int pid;
310     {
311     int status, psn, psn2;
312     PSERVER *ps2;
313    
314     if (pid == -1) { /* wait for first job */
315     if (ps != NULL) {
316     for (psn = ps->nprocs; psn--; )
317     if (ps->proc[psn].com != NULL)
318     break;
319     if (psn < 0)
320     return(-1); /* no processes this server */
321     }
322     do {
323     wait4end(); /* wait for something to end */
324     if ((psn2 = wait(&status)) == -1)
325     return(-1); /* none left */
326     ps2 = findjob(&psn2);
327     if (ps2 != NULL) /* clean up job if ours */
328     status = finishjob(ps2, psn2, status);
329     } while (ps2 == NULL || (ps != NULL && ps2 != ps));
330     return(status); /* return job status */
331     }
332     psn = pid; /* else find specific job */
333     ps2 = findjob(&psn); /* find process slot */
334     if (ps2 == NULL || (ps != NULL && ps2 != ps))
335     return(-1); /* inconsistent target */
336     ps = ps2;
337     do {
338     wait4end(); /* wait for something to end */
339     if ((psn2 = wait(&status)) == -1)
340     return(-1); /* none left */
341     ps2 = findjob(&psn2);
342     if (ps2 != NULL) /* clean up job if ours */
343     status = finishjob(ps2, psn2, status);
344     } while (ps2 != ps || psn2 != psn);
345     return(status); /* return job status */
346     }