ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/radiance/ray/src/util/netproc.c
Revision: 2.17
Committed: Mon Mar 15 21:31:50 2010 UTC (14 years, 7 months ago) by greg
Content type: text/plain
Branch: MAIN
CVS Tags: rad5R4, rad5R2, rad4R2P2, rad5R0, rad5R1, rad4R2, rad4R1, rad4R2P1, rad5R3, HEAD
Changes since 2.16: +8 -6 lines
Log Message:
Changed behavior so no change to remote user directory if full path given

File Contents

# Content
1 #ifndef lint
2 static const char RCSid[] = "$Id: netproc.c,v 2.16 2004/09/20 16:26:58 greg Exp $";
3 #endif
4 /*
5 * Parallel network process handling routines
6 */
7
8 #include <stdlib.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <signal.h>
12 #include <fcntl.h>
13 #include <unistd.h>
14 #include <sys/wait.h>
15
16 #include "rtmisc.h"
17 #include "selcall.h"
18 #include "netproc.h"
19 #include "paths.h"
20
21 PSERVER *pslist = NULL; /* global process server list */
22
23 static NETPROC *pindex[FD_SETSIZE]; /* process index table */
24
25 static char ourhost[64]; /* this host name */
26 static char ourdir[PATH_MAX]; /* our working directory */
27 static char ouruser[32]; /* our user name */
28 static char *ourshell; /* our user's shell */
29
30 static fd_set errdesc; /* error file descriptors */
31 static int maxfd; /* maximum assigned descriptor */
32
33 extern char *remsh; /* externally defined remote shell program */
34
35 static int readerrs(int fd);
36 static void wait4end(void);
37 static int finishjob(PSERVER *ps, int pn, int status);
38
39
40 extern PSERVER *
41 addpserver( /* add a new process server */
42 char *host,
43 char *dir,
44 char *usr,
45 int np
46 )
47 {
48 register PSERVER *ps;
49 /* allocate the struct */
50 if (np < 1)
51 return(NULL);
52 ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(NETPROC));
53 if (ps == NULL)
54 return(NULL);
55 if (!ourhost[0]) { /* initialize */
56 char dirtmp[PATH_MAX];
57 register char *cp;
58 register int len;
59
60 strcpy(ourhost, myhostname());
61 getcwd(dirtmp, sizeof(dirtmp));
62 if ((cp = getenv("HOME")) != NULL) {
63 if (!strcmp(cp, dirtmp))
64 ourdir[0] = '\0';
65 else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
66 dirtmp[len] == '/')
67 strcpy(ourdir, dirtmp+len+1);
68 else
69 strcpy(ourdir, dirtmp);
70 } else
71 strcpy(ourdir, dirtmp);
72 if ((cp = getenv("USER")) != NULL)
73 strcpy(ouruser, cp);
74 if ((ourshell = getenv("SHELL")) == NULL)
75 ourshell = "/bin/sh";
76 FD_ZERO(&errdesc);
77 maxfd = -1;
78 }
79 /* assign host, directory, user */
80 if (host == NULL || !strcmp(host, ourhost) ||
81 !strcmp(host, LHOSTNAME))
82 ps->hostname[0] = '\0';
83 else
84 strcpy(ps->hostname, host);
85 if (dir == NULL)
86 strcpy(ps->directory, ourdir);
87 else
88 strcpy(ps->directory, dir);
89 if (usr == NULL || !strcmp(usr, ouruser))
90 ps->username[0] = '\0';
91 else
92 strcpy(ps->username, usr);
93 /* clear process slots */
94 ps->nprocs = np;
95 while (np--) {
96 ps->proc[np].com = NULL;
97 ps->proc[np].pid = -1;
98 ps->proc[np].efd = -1;
99 ps->proc[np].errs = NULL;
100 ps->proc[np].elen = 0;
101 ps->proc[np].cf = NULL;
102 }
103 /* insert in our list */
104 ps->next = pslist;
105 pslist = ps;
106 /* check for signs of life */
107 if (!pserverOK(ps)) {
108 delpserver(ps); /* failure -- abort */
109 return(NULL);
110 }
111 return(ps);
112 }
113
114
115 extern void
116 delpserver( /* delete a process server */
117 PSERVER *ps
118 )
119 {
120 PSERVER pstart;
121 register PSERVER *psp;
122 register int i;
123 /* find server in our list */
124 pstart.next = pslist;
125 for (psp = &pstart; ps != psp->next; psp = psp->next)
126 if (psp->next == NULL)
127 return; /* not in our list! */
128 /* kill any running jobs */
129 for (i = 0; i < ps->nprocs; i++)
130 if (ps->proc[i].com != NULL) {
131 kill(SIGTERM, ps->proc[i].pid);
132 wait4job(ps, ps->proc[i].pid);
133 }
134 /* remove server from list */
135 psp->next = ps->next;
136 pslist = pstart.next;
137 free((void *)ps); /* free associated memory */
138 }
139
140
141 extern PSERVER *
142 findjob( /* find out where process is running */
143 register int *pnp /* modified */
144 )
145 {
146 register PSERVER *ps;
147 register int i;
148
149 for (ps = pslist; ps != NULL; ps = ps->next)
150 for (i = 0; i < ps->nprocs; i++)
151 if (ps->proc[i].pid == *pnp) {
152 *pnp = i;
153 return(ps);
154 }
155 return(NULL); /* not found */
156 }
157
158
159 extern int
160 startjob( /* start a job on a process server */
161 register PSERVER *ps,
162 char *command,
163 pscompfunc *compf
164 )
165 {
166 char udirt[PATH_MAX];
167 char *av[16];
168 int pfd[2], pid;
169 register int i;
170
171 if (ps == NULL) { /* find a server */
172 for (ps = pslist; ps != NULL; ps = ps->next)
173 if ((i = startjob(ps, command, compf)) != -1)
174 return(i); /* got one */
175 return(-1); /* no slots anywhere */
176 }
177 for (i = 0; i < ps->nprocs; i++)
178 if (ps->proc[i].com == NULL)
179 break;
180 if (i >= ps->nprocs)
181 return(-1); /* out of process slots */
182 /* open pipe */
183 if (pipe(pfd) < 0) {
184 perror("cannot open pipe");
185 exit(1);
186 }
187 /* start child process */
188 if ((pid = fork()) == 0) {
189 close(pfd[0]); /* connect stderr to pipe */
190 if (pfd[1] != 2) {
191 dup2(pfd[1], 2);
192 close(pfd[1]);
193 }
194 if (ps->hostname[0]) { /* rsh command */
195 av[i=0] = remsh;
196 av[++i] = ps->hostname;
197 av[++i] = "-n"; /* no stdin */
198 if (ps->username[0]) { /* different user */
199 av[++i] = "-l";
200 av[++i] = ps->username;
201 if (ps->directory[0] != '/') {
202 av[++i] = "cd";
203 udirt[0] = '~';
204 strcpy(udirt+1, ouruser);
205 av[++i] = udirt;
206 av[++i] = ";";
207 }
208 }
209 if (ps->directory[0]) { /* change directory */
210 av[++i] = "cd";
211 av[++i] = ps->directory;
212 av[++i] = ";";
213 }
214 av[++i] = command;
215 av[++i] = NULL;
216 } else { /* shell command */
217 av[0] = ourshell;
218 av[1] = "-c";
219 av[2] = command;
220 av[3] = NULL;
221 }
222 execv(av[0], av);
223 _exit(1);
224 }
225 if (pid == -1) {
226 perror("fork failed");
227 exit(1);
228 }
229 ps->proc[i].com = command; /* assign process slot */
230 ps->proc[i].cf = compf;
231 ps->proc[i].pid = pid;
232 close(pfd[1]); /* get piped stderr file descriptor */
233 ps->proc[i].efd = pfd[0];
234 fcntl(pfd[0], F_SETFD, FD_CLOEXEC); /* set close on exec flag */
235 pindex[pfd[0]] = ps->proc + i; /* assign error fd index */
236 FD_SET(pfd[0], &errdesc); /* add to select call parameter */
237 if (pfd[0] > maxfd)
238 maxfd = pfd[0];
239 return(pid); /* return to parent process */
240 }
241
242
243 static int
244 readerrs( /* read error output from fd */
245 int fd
246 )
247 {
248 char errbuf[BUFSIZ];
249 int nr;
250 register NETPROC *pp;
251 /* look up associated process */
252 if ((pp = pindex[fd]) == NULL)
253 abort(); /* serious consistency error */
254 nr = read(fd, errbuf, BUFSIZ-1);
255 if (nr < 0) {
256 perror("read error");
257 exit(1);
258 }
259 if (nr == 0) /* stream closed (process finished) */
260 return(0);
261 errbuf[nr] = '\0'; /* add to error buffer */
262 if (pp->elen == 0)
263 pp->errs = (char *)malloc(nr+1);
264 else
265 pp->errs = (char *)realloc((void *)pp->errs, pp->elen+nr+1);
266 if (pp->errs == NULL) {
267 perror("malloc failed");
268 exit(1);
269 }
270 strcpy(pp->errs+pp->elen, errbuf);
271 pp->elen += nr;
272 return(nr);
273 }
274
275
276 static void
277 wait4end(void) /* read error streams until someone is done */
278 {
279 fd_set readfds, excepfds;
280 register int i;
281 /* find end of descriptor set */
282 for ( ; maxfd >= 0; maxfd--)
283 if (FD_ISSET(maxfd, &errdesc))
284 break;
285 if (maxfd < 0)
286 return; /* nothing to read */
287 readfds = excepfds = errdesc;
288 while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
289 for (i = 0; i <= maxfd; i++) /* get pending i/o */
290 if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
291 if (readerrs(i) == 0)
292 return; /* finished process */
293 perror("select call failed");
294 exit(1);
295 }
296
297
298 static int
299 finishjob( /* clean up finished process */
300 PSERVER *ps,
301 int pn,
302 int status
303 )
304 {
305 register NETPROC *pp;
306
307 pp = ps->proc + pn;
308 if (pp->cf != NULL) /* client cleanup */
309 status = (*pp->cf)(ps, pn, status);
310 close(pp->efd); /* close error stream */
311 pindex[pp->efd] = NULL;
312 FD_CLR(pp->efd, &errdesc);
313 free((void *)pp->errs);
314 pp->com = NULL; /* clear settings */
315 pp->pid = -1;
316 pp->efd = -1;
317 pp->errs = NULL;
318 pp->elen = 0;
319 pp->cf = NULL;
320 return(status);
321 }
322
323
324 extern int
325 wait4job( /* wait for process to finish */
326 PSERVER *ps,
327 int pid
328 )
329 {
330 int status, psn, psn2;
331 PSERVER *ps2;
332
333 if (pid == -1) { /* wait for first job */
334 if (ps != NULL) {
335 for (psn = ps->nprocs; psn--; )
336 if (ps->proc[psn].com != NULL)
337 break;
338 if (psn < 0)
339 return(-1); /* no processes this server */
340 }
341 do {
342 wait4end(); /* wait for something to end */
343 if ((psn2 = wait(&status)) == -1)
344 return(-1); /* none left */
345 ps2 = findjob(&psn2);
346 if (ps2 != NULL) /* clean up job if ours */
347 status = finishjob(ps2, psn2, status);
348 } while (ps2 == NULL || (ps != NULL && ps2 != ps));
349 return(status); /* return job status */
350 }
351 psn = pid; /* else find specific job */
352 ps2 = findjob(&psn); /* find process slot */
353 if (ps2 == NULL || (ps != NULL && ps2 != ps))
354 return(-1); /* inconsistent target */
355 ps = ps2;
356 do {
357 wait4end(); /* wait for something to end */
358 if ((psn2 = wait(&status)) == -1)
359 return(-1); /* none left */
360 ps2 = findjob(&psn2);
361 if (ps2 != NULL) /* clean up job if ours */
362 status = finishjob(ps2, psn2, status);
363 } while (ps2 != ps || psn2 != psn);
364 return(status); /* return job status */
365 }