ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.16
Committed: Mon Sep 20 16:26:58 2004 UTC (19 years, 6 months ago) by greg
Content type: text/plain
Branch: MAIN
CVS Tags: rad3R7P2, rad3R7P1, rad4R0, rad3R6, rad3R6P1, rad3R8, rad3R9
Changes since 2.15: +2 -2 lines
Log Message:
Added close-on-exec flag to pipes to prevent possible deadlocks

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: netproc.c,v 2.15 2004/03/26 21:36:19 schorsch Exp $";
3 #endif
4 /*
5 * Parallel network process handling routines
6 */
7
8 #include <stdlib.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <signal.h>
12 #include <fcntl.h>
13 #include <unistd.h>
14 #include <sys/wait.h>
15
16 #include "rtmisc.h"
17 #include "selcall.h"
18 #include "netproc.h"
19 #include "paths.h"
20
21 PSERVER *pslist = NULL; /* global process server list */
22
23 static NETPROC *pindex[FD_SETSIZE]; /* process index table */
24
25 static char ourhost[64]; /* this host name */
26 static char ourdir[PATH_MAX]; /* our working directory */
27 static char ouruser[32]; /* our user name */
28 static char *ourshell; /* our user's shell */
29
30 static fd_set errdesc; /* error file descriptors */
31 static int maxfd; /* maximum assigned descriptor */
32
33 extern char *remsh; /* externally defined remote shell program */
34
35 static int readerrs(int fd);
36 static void wait4end(void);
37 static int finishjob(PSERVER *ps, int pn, int status);
38
39
40 extern PSERVER *
41 addpserver( /* add a new process server */
42 char *host,
43 char *dir,
44 char *usr,
45 int np
46 )
47 {
48 register PSERVER *ps;
49 /* allocate the struct */
50 if (np < 1)
51 return(NULL);
52 ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(NETPROC));
53 if (ps == NULL)
54 return(NULL);
55 if (!ourhost[0]) { /* initialize */
56 char dirtmp[PATH_MAX];
57 register char *cp;
58 register int len;
59
60 strcpy(ourhost, myhostname());
61 getcwd(dirtmp, sizeof(dirtmp));
62 if ((cp = getenv("HOME")) != NULL) {
63 if (!strcmp(cp, dirtmp))
64 ourdir[0] = '\0';
65 else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
66 dirtmp[len] == '/')
67 strcpy(ourdir, dirtmp+len+1);
68 else
69 strcpy(ourdir, dirtmp);
70 } else
71 strcpy(ourdir, dirtmp);
72 if ((cp = getenv("USER")) != NULL)
73 strcpy(ouruser, cp);
74 if ((ourshell = getenv("SHELL")) == NULL)
75 ourshell = "/bin/sh";
76 FD_ZERO(&errdesc);
77 maxfd = -1;
78 }
79 /* assign host, directory, user */
80 if (host == NULL || !strcmp(host, ourhost) ||
81 !strcmp(host, LHOSTNAME))
82 ps->hostname[0] = '\0';
83 else
84 strcpy(ps->hostname, host);
85 if (dir == NULL)
86 strcpy(ps->directory, ourdir);
87 else
88 strcpy(ps->directory, dir);
89 if (usr == NULL || !strcmp(usr, ouruser))
90 ps->username[0] = '\0';
91 else
92 strcpy(ps->username, usr);
93 /* clear process slots */
94 ps->nprocs = np;
95 while (np--) {
96 ps->proc[np].com = NULL;
97 ps->proc[np].pid = -1;
98 ps->proc[np].efd = -1;
99 ps->proc[np].errs = NULL;
100 ps->proc[np].elen = 0;
101 ps->proc[np].cf = NULL;
102 }
103 /* insert in our list */
104 ps->next = pslist;
105 pslist = ps;
106 /* check for signs of life */
107 if (!pserverOK(ps)) {
108 delpserver(ps); /* failure -- abort */
109 return(NULL);
110 }
111 return(ps);
112 }
113
114
115 extern void
116 delpserver( /* delete a process server */
117 PSERVER *ps
118 )
119 {
120 PSERVER pstart;
121 register PSERVER *psp;
122 register int i;
123 /* find server in our list */
124 pstart.next = pslist;
125 for (psp = &pstart; ps != psp->next; psp = psp->next)
126 if (psp->next == NULL)
127 return; /* not in our list! */
128 /* kill any running jobs */
129 for (i = 0; i < ps->nprocs; i++)
130 if (ps->proc[i].com != NULL) {
131 kill(SIGTERM, ps->proc[i].pid);
132 wait4job(ps, ps->proc[i].pid);
133 }
134 /* remove server from list */
135 psp->next = ps->next;
136 pslist = pstart.next;
137 free((void *)ps); /* free associated memory */
138 }
139
140
141 extern PSERVER *
142 findjob( /* find out where process is running */
143 register int *pnp /* modified */
144 )
145 {
146 register PSERVER *ps;
147 register int i;
148
149 for (ps = pslist; ps != NULL; ps = ps->next)
150 for (i = 0; i < ps->nprocs; i++)
151 if (ps->proc[i].pid == *pnp) {
152 *pnp = i;
153 return(ps);
154 }
155 return(NULL); /* not found */
156 }
157
158
159 extern int
160 startjob( /* start a job on a process server */
161 register PSERVER *ps,
162 char *command,
163 pscompfunc *compf
164 )
165 {
166 char udirt[PATH_MAX];
167 char *av[16];
168 int pfd[2], pid;
169 register int i;
170
171 if (ps == NULL) { /* find a server */
172 for (ps = pslist; ps != NULL; ps = ps->next)
173 if ((i = startjob(ps, command, compf)) != -1)
174 return(i); /* got one */
175 return(-1); /* no slots anywhere */
176 }
177 for (i = 0; i < ps->nprocs; i++)
178 if (ps->proc[i].com == NULL)
179 break;
180 if (i >= ps->nprocs)
181 return(-1); /* out of process slots */
182 /* open pipe */
183 if (pipe(pfd) < 0) {
184 perror("cannot open pipe");
185 exit(1);
186 }
187 /* start child process */
188 if ((pid = fork()) == 0) {
189 close(pfd[0]); /* connect stderr to pipe */
190 if (pfd[1] != 2) {
191 dup2(pfd[1], 2);
192 close(pfd[1]);
193 }
194 if (ps->hostname[0]) { /* rsh command */
195 av[i=0] = remsh;
196 av[++i] = ps->hostname;
197 av[++i] = "-n"; /* no stdin */
198 if (ps->username[0]) { /* different user */
199 av[++i] = "-l";
200 av[++i] = ps->username;
201 av[++i] = "cd";
202 udirt[0] = '~';
203 strcpy(udirt+1, ouruser);
204 av[++i] = udirt;
205 av[++i] = ";";
206 }
207 if (ps->directory[0]) { /* change directory */
208 av[++i] = "cd";
209 av[++i] = ps->directory;
210 av[++i] = ";";
211 }
212 av[++i] = command;
213 av[++i] = NULL;
214 } else { /* shell command */
215 av[0] = ourshell;
216 av[1] = "-c";
217 av[2] = command;
218 av[3] = NULL;
219 }
220 execv(av[0], av);
221 _exit(1);
222 }
223 if (pid == -1) {
224 perror("fork failed");
225 exit(1);
226 }
227 ps->proc[i].com = command; /* assign process slot */
228 ps->proc[i].cf = compf;
229 ps->proc[i].pid = pid;
230 close(pfd[1]); /* get piped stderr file descriptor */
231 ps->proc[i].efd = pfd[0];
232 fcntl(pfd[0], F_SETFD, FD_CLOEXEC); /* set close on exec flag */
233 pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
234 FD_SET(pfd[0], &errdesc); /* add to select call parameter */
235 if (pfd[0] > maxfd)
236 maxfd = pfd[0];
237 return(pid); /* return to parent process */
238 }
239
240
241 static int
242 readerrs( /* read error output from fd */
243 int fd
244 )
245 {
246 char errbuf[BUFSIZ];
247 int nr;
248 register NETPROC *pp;
249 /* look up associated process */
250 if ((pp = pindex[fd]) == NULL)
251 abort(); /* serious consistency error */
252 nr = read(fd, errbuf, BUFSIZ-1);
253 if (nr < 0) {
254 perror("read error");
255 exit(1);
256 }
257 if (nr == 0) /* stream closed (process finished) */
258 return(0);
259 errbuf[nr] = '\0'; /* add to error buffer */
260 if (pp->elen == 0)
261 pp->errs = (char *)malloc(nr+1);
262 else
263 pp->errs = (char *)realloc((void *)pp->errs, pp->elen+nr+1);
264 if (pp->errs == NULL) {
265 perror("malloc failed");
266 exit(1);
267 }
268 strcpy(pp->errs+pp->elen, errbuf);
269 pp->elen += nr;
270 return(nr);
271 }
272
273
274 static void
275 wait4end(void) /* read error streams until someone is done */
276 {
277 fd_set readfds, excepfds;
278 register int i;
279 /* find end of descriptor set */
280 for ( ; maxfd >= 0; maxfd--)
281 if (FD_ISSET(maxfd, &errdesc))
282 break;
283 if (maxfd < 0)
284 return; /* nothing to read */
285 readfds = excepfds = errdesc;
286 while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
287 for (i = 0; i <= maxfd; i++) /* get pending i/o */
288 if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
289 if (readerrs(i) == 0)
290 return; /* finished process */
291 perror("select call failed");
292 exit(1);
293 }
294
295
296 static int
297 finishjob( /* clean up finished process */
298 PSERVER *ps,
299 int pn,
300 int status
301 )
302 {
303 register NETPROC *pp;
304
305 pp = ps->proc + pn;
306 if (pp->cf != NULL) /* client cleanup */
307 status = (*pp->cf)(ps, pn, status);
308 close(pp->efd); /* close error stream */
309 pindex[pp->efd] = NULL;
310 FD_CLR(pp->efd, &errdesc);
311 free((void *)pp->errs);
312 pp->com = NULL; /* clear settings */
313 pp->pid = -1;
314 pp->efd = -1;
315 pp->errs = NULL;
316 pp->elen = 0;
317 pp->cf = NULL;
318 return(status);
319 }
320
321
322 extern int
323 wait4job( /* wait for process to finish */
324 PSERVER *ps,
325 int pid
326 )
327 {
328 int status, psn, psn2;
329 PSERVER *ps2;
330
331 if (pid == -1) { /* wait for first job */
332 if (ps != NULL) {
333 for (psn = ps->nprocs; psn--; )
334 if (ps->proc[psn].com != NULL)
335 break;
336 if (psn < 0)
337 return(-1); /* no processes this server */
338 }
339 do {
340 wait4end(); /* wait for something to end */
341 if ((psn2 = wait(&status)) == -1)
342 return(-1); /* none left */
343 ps2 = findjob(&psn2);
344 if (ps2 != NULL) /* clean up job if ours */
345 status = finishjob(ps2, psn2, status);
346 } while (ps2 == NULL || (ps != NULL && ps2 != ps));
347 return(status); /* return job status */
348 }
349 psn = pid; /* else find specific job */
350 ps2 = findjob(&psn); /* find process slot */
351 if (ps2 == NULL || (ps != NULL && ps2 != ps))
352 return(-1); /* inconsistent target */
353 ps = ps2;
354 do {
355 wait4end(); /* wait for something to end */
356 if ((psn2 = wait(&status)) == -1)
357 return(-1); /* none left */
358 ps2 = findjob(&psn2);
359 if (ps2 != NULL) /* clean up job if ours */
360 status = finishjob(ps2, psn2, status);
361 } while (ps2 != ps || psn2 != psn);
362 return(status); /* return job status */
363 }