ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.15
Committed: Fri Mar 26 21:36:19 2004 UTC (20 years ago) by schorsch
Content type: text/plain
Branch: MAIN
Changes since 2.14: +43 -27 lines
Log Message:
Continued ANSIfication.

File Contents

# User Rev Content
1 greg 2.1 #ifndef lint
2 schorsch 2.15 static const char RCSid[] = "$Id: netproc.c,v 2.14 2003/11/11 16:24:06 greg Exp $";
3 greg 2.1 #endif
4     /*
5     * Parallel network process handling routines
6     */
7    
8 schorsch 2.12 #include <stdlib.h>
9 greg 2.1 #include <stdio.h>
10 schorsch 2.12 #include <string.h>
11 greg 2.1 #include <signal.h>
12     #include <fcntl.h>
13 schorsch 2.11 #include <unistd.h>
14 schorsch 2.15 #include <sys/wait.h>
15 schorsch 2.11
16 schorsch 2.15 #include "rtmisc.h"
17 gregl 2.8 #include "selcall.h"
18 greg 2.1 #include "netproc.h"
19 greg 2.5 #include "paths.h"
20 greg 2.1
21     PSERVER *pslist = NULL; /* global process server list */
22    
23 schorsch 2.12 static NETPROC *pindex[FD_SETSIZE]; /* process index table */
24 greg 2.1
25     static char ourhost[64]; /* this host name */
26 schorsch 2.11 static char ourdir[PATH_MAX]; /* our working directory */
27 greg 2.1 static char ouruser[32]; /* our user name */
28     static char *ourshell; /* our user's shell */
29    
30     static fd_set errdesc; /* error file descriptors */
31     static int maxfd; /* maximum assigned descriptor */
32    
33 greg 2.3 extern char *remsh; /* externally defined remote shell program */
34    
35 schorsch 2.15 static int readerrs(int fd);
36     static void wait4end(void);
37     static int finishjob(PSERVER *ps, int pn, int status);
38 greg 2.1
39 schorsch 2.15
40     extern PSERVER *
41     addpserver( /* add a new process server */
42     char *host,
43     char *dir,
44     char *usr,
45     int np
46     )
47 greg 2.1 {
48     register PSERVER *ps;
49     /* allocate the struct */
50     if (np < 1)
51     return(NULL);
52 schorsch 2.12 ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(NETPROC));
53 greg 2.1 if (ps == NULL)
54     return(NULL);
55     if (!ourhost[0]) { /* initialize */
56 schorsch 2.11 char dirtmp[PATH_MAX];
57 greg 2.1 register char *cp;
58     register int len;
59    
60 greg 2.2 strcpy(ourhost, myhostname());
61 schorsch 2.11 getcwd(dirtmp, sizeof(dirtmp));
62 greg 2.1 if ((cp = getenv("HOME")) != NULL) {
63     if (!strcmp(cp, dirtmp))
64     ourdir[0] = '\0';
65     else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
66     dirtmp[len] == '/')
67     strcpy(ourdir, dirtmp+len+1);
68     else
69     strcpy(ourdir, dirtmp);
70     } else
71     strcpy(ourdir, dirtmp);
72     if ((cp = getenv("USER")) != NULL)
73     strcpy(ouruser, cp);
74     if ((ourshell = getenv("SHELL")) == NULL)
75     ourshell = "/bin/sh";
76     FD_ZERO(&errdesc);
77     maxfd = -1;
78     }
79     /* assign host, directory, user */
80     if (host == NULL || !strcmp(host, ourhost) ||
81     !strcmp(host, LHOSTNAME))
82     ps->hostname[0] = '\0';
83     else
84     strcpy(ps->hostname, host);
85     if (dir == NULL)
86     strcpy(ps->directory, ourdir);
87     else
88     strcpy(ps->directory, dir);
89     if (usr == NULL || !strcmp(usr, ouruser))
90     ps->username[0] = '\0';
91     else
92     strcpy(ps->username, usr);
93     /* clear process slots */
94     ps->nprocs = np;
95     while (np--) {
96     ps->proc[np].com = NULL;
97     ps->proc[np].pid = -1;
98     ps->proc[np].efd = -1;
99     ps->proc[np].errs = NULL;
100     ps->proc[np].elen = 0;
101     ps->proc[np].cf = NULL;
102     }
103     /* insert in our list */
104     ps->next = pslist;
105     pslist = ps;
106     /* check for signs of life */
107     if (!pserverOK(ps)) {
108     delpserver(ps); /* failure -- abort */
109     return(NULL);
110     }
111     return(ps);
112     }
113    
114    
115 schorsch 2.15 extern void
116     delpserver( /* delete a process server */
117     PSERVER *ps
118     )
119 greg 2.1 {
120     PSERVER pstart;
121     register PSERVER *psp;
122     register int i;
123     /* find server in our list */
124     pstart.next = pslist;
125     for (psp = &pstart; ps != psp->next; psp = psp->next)
126     if (psp->next == NULL)
127     return; /* not in our list! */
128     /* kill any running jobs */
129     for (i = 0; i < ps->nprocs; i++)
130     if (ps->proc[i].com != NULL) {
131     kill(SIGTERM, ps->proc[i].pid);
132     wait4job(ps, ps->proc[i].pid);
133     }
134     /* remove server from list */
135     psp->next = ps->next;
136     pslist = pstart.next;
137 greg 2.9 free((void *)ps); /* free associated memory */
138 greg 2.1 }
139    
140    
141 schorsch 2.15 extern PSERVER *
142     findjob( /* find out where process is running */
143     register int *pnp /* modified */
144     )
145 greg 2.1 {
146     register PSERVER *ps;
147     register int i;
148    
149     for (ps = pslist; ps != NULL; ps = ps->next)
150     for (i = 0; i < ps->nprocs; i++)
151     if (ps->proc[i].pid == *pnp) {
152     *pnp = i;
153     return(ps);
154     }
155     return(NULL); /* not found */
156     }
157    
158    
159 schorsch 2.15 extern int
160     startjob( /* start a job on a process server */
161     register PSERVER *ps,
162     char *command,
163     pscompfunc *compf
164     )
165 greg 2.1 {
166 schorsch 2.11 char udirt[PATH_MAX];
167 greg 2.3 char *av[16];
168 greg 2.1 int pfd[2], pid;
169     register int i;
170    
171     if (ps == NULL) { /* find a server */
172     for (ps = pslist; ps != NULL; ps = ps->next)
173     if ((i = startjob(ps, command, compf)) != -1)
174     return(i); /* got one */
175     return(-1); /* no slots anywhere */
176     }
177     for (i = 0; i < ps->nprocs; i++)
178     if (ps->proc[i].com == NULL)
179     break;
180     if (i >= ps->nprocs)
181     return(-1); /* out of process slots */
182     /* open pipe */
183     if (pipe(pfd) < 0) {
184     perror("cannot open pipe");
185     exit(1);
186     }
187     /* start child process */
188 greg 2.14 if ((pid = fork()) == 0) {
189 greg 2.1 close(pfd[0]); /* connect stderr to pipe */
190     if (pfd[1] != 2) {
191     dup2(pfd[1], 2);
192     close(pfd[1]);
193     }
194     if (ps->hostname[0]) { /* rsh command */
195 greg 2.3 av[i=0] = remsh;
196 greg 2.1 av[++i] = ps->hostname;
197 greg 2.3 av[++i] = "-n"; /* no stdin */
198     if (ps->username[0]) { /* different user */
199 greg 2.1 av[++i] = "-l";
200     av[++i] = ps->username;
201 greg 2.3 av[++i] = "cd";
202     udirt[0] = '~';
203     strcpy(udirt+1, ouruser);
204     av[++i] = udirt;
205     av[++i] = ";";
206 greg 2.1 }
207 greg 2.3 if (ps->directory[0]) { /* change directory */
208 greg 2.1 av[++i] = "cd";
209     av[++i] = ps->directory;
210     av[++i] = ";";
211     }
212     av[++i] = command;
213     av[++i] = NULL;
214     } else { /* shell command */
215     av[0] = ourshell;
216     av[1] = "-c";
217     av[2] = command;
218     av[3] = NULL;
219     }
220     execv(av[0], av);
221     _exit(1);
222     }
223     if (pid == -1) {
224     perror("fork failed");
225     exit(1);
226     }
227     ps->proc[i].com = command; /* assign process slot */
228     ps->proc[i].cf = compf;
229     ps->proc[i].pid = pid;
230     close(pfd[1]); /* get piped stderr file descriptor */
231     ps->proc[i].efd = pfd[0];
232     fcntl(pfd[0], F_SETFD, 1); /* set close on exec flag */
233     pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
234     FD_SET(pfd[0], &errdesc); /* add to select call parameter */
235     if (pfd[0] > maxfd)
236     maxfd = pfd[0];
237     return(pid); /* return to parent process */
238     }
239    
240    
241     static int
242 schorsch 2.15 readerrs( /* read error output from fd */
243     int fd
244     )
245 greg 2.1 {
246     char errbuf[BUFSIZ];
247     int nr;
248 schorsch 2.12 register NETPROC *pp;
249 greg 2.1 /* look up associated process */
250     if ((pp = pindex[fd]) == NULL)
251     abort(); /* serious consistency error */
252     nr = read(fd, errbuf, BUFSIZ-1);
253     if (nr < 0) {
254     perror("read error");
255     exit(1);
256     }
257     if (nr == 0) /* stream closed (process finished) */
258     return(0);
259     errbuf[nr] = '\0'; /* add to error buffer */
260     if (pp->elen == 0)
261     pp->errs = (char *)malloc(nr+1);
262     else
263 greg 2.10 pp->errs = (char *)realloc((void *)pp->errs, pp->elen+nr+1);
264 greg 2.1 if (pp->errs == NULL) {
265     perror("malloc failed");
266     exit(1);
267     }
268     strcpy(pp->errs+pp->elen, errbuf);
269     pp->elen += nr;
270     return(nr);
271     }
272    
273    
274 schorsch 2.15 static void
275     wait4end(void) /* read error streams until someone is done */
276 greg 2.1 {
277     fd_set readfds, excepfds;
278     register int i;
279     /* find end of descriptor set */
280     for ( ; maxfd >= 0; maxfd--)
281     if (FD_ISSET(maxfd, &errdesc))
282     break;
283     if (maxfd < 0)
284     return; /* nothing to read */
285     readfds = excepfds = errdesc;
286     while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
287     for (i = 0; i <= maxfd; i++) /* get pending i/o */
288     if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
289     if (readerrs(i) == 0)
290     return; /* finished process */
291     perror("select call failed");
292     exit(1);
293     }
294    
295    
296     static int
297 schorsch 2.15 finishjob( /* clean up finished process */
298     PSERVER *ps,
299     int pn,
300     int status
301     )
302 greg 2.1 {
303 schorsch 2.12 register NETPROC *pp;
304 greg 2.1
305     pp = ps->proc + pn;
306     if (pp->cf != NULL) /* client cleanup */
307     status = (*pp->cf)(ps, pn, status);
308     close(pp->efd); /* close error stream */
309     pindex[pp->efd] = NULL;
310     FD_CLR(pp->efd, &errdesc);
311 greg 2.9 free((void *)pp->errs);
312 greg 2.1 pp->com = NULL; /* clear settings */
313     pp->pid = -1;
314     pp->efd = -1;
315     pp->errs = NULL;
316     pp->elen = 0;
317     pp->cf = NULL;
318     return(status);
319     }
320    
321    
322 schorsch 2.15 extern int
323     wait4job( /* wait for process to finish */
324     PSERVER *ps,
325     int pid
326     )
327 greg 2.1 {
328     int status, psn, psn2;
329     PSERVER *ps2;
330    
331     if (pid == -1) { /* wait for first job */
332     if (ps != NULL) {
333     for (psn = ps->nprocs; psn--; )
334     if (ps->proc[psn].com != NULL)
335     break;
336     if (psn < 0)
337     return(-1); /* no processes this server */
338     }
339     do {
340     wait4end(); /* wait for something to end */
341     if ((psn2 = wait(&status)) == -1)
342     return(-1); /* none left */
343     ps2 = findjob(&psn2);
344     if (ps2 != NULL) /* clean up job if ours */
345     status = finishjob(ps2, psn2, status);
346     } while (ps2 == NULL || (ps != NULL && ps2 != ps));
347     return(status); /* return job status */
348     }
349     psn = pid; /* else find specific job */
350     ps2 = findjob(&psn); /* find process slot */
351     if (ps2 == NULL || (ps != NULL && ps2 != ps))
352     return(-1); /* inconsistent target */
353     ps = ps2;
354     do {
355     wait4end(); /* wait for something to end */
356     if ((psn2 = wait(&status)) == -1)
357     return(-1); /* none left */
358     ps2 = findjob(&psn2);
359     if (ps2 != NULL) /* clean up job if ours */
360     status = finishjob(ps2, psn2, status);
361     } while (ps2 != ps || psn2 != psn);
362     return(status); /* return job status */
363     }