X-Git-Url: http://git.onelab.eu/?p=vsys.git;a=blobdiff_plain;f=fifowatcher.ml;h=58113a138ce3ec35e2eab0f81ba551f12cc042bc;hp=7871821a4ffd078fcfc98b1a18adc67eed3bf425;hb=HEAD;hpb=892d26d4e05de55ae94d21a586e6a0bfa724e327 diff --git a/fifowatcher.ml b/fifowatcher.ml index 7871821..58113a1 100644 --- a/fifowatcher.ml +++ b/fifowatcher.ml @@ -5,52 +5,57 @@ open Unix open Globals open Dirwatcher open Printf +open Splice (** A connected process, FIFO *) -type channel_pipe = Process of out_channel | Fifo of out_channel +type channel_pipe = Process of Unix.file_descr | Fifo of Unix.file_descr | BrokenPipe -type signed_fd = Infd of Unix.file_descr | Outfd of Unix.file_descr +(** Signed file descriptors. *) +type signed_fd = Infd of Unix.file_descr | Outfd of Unix.file_descr | Eventfd of Unix.file_descr +(** XXX This will get deprecated when we switch to inotify *) let fdmap: (Unix.file_descr,string*string) Hashtbl.t = Hashtbl.create 1024 -let pidmap: (int,signed_fd*signed_fd*Unix.file_descr) Hashtbl.t = Hashtbl.create 1024 + +(** Maps pids to slice connections. Needed to clean up fds when a script dies + with EPIPE *) +let pidmap: (int,signed_fd list) Hashtbl.t = Hashtbl.create 1024 let backend_prefix = ref "" let open_fds: (Unix.file_descr,channel_pipe) Hashtbl.t = Hashtbl.create 1024 (** Receive an event from a running script. This event must be relayed to the - slice that invoked it + slice that invoked it. + @param idesc fd/fname identifier for process *) let receive_process_event (idesc: fname_and_fd) (_: fname_and_fd) = let (_,ifd) = idesc in let cp = try Hashtbl.find open_fds ifd with Not_found-> - printf "Fifo fd disappeared\n";raise Bug + logprint "Fifo fd disappeared\n";raise Bug in match (cp) with - | Fifo(fifo_outchan) -> - let process_inchan = in_channel_of_descr ifd in - let cont = ref true in - let count = ref 0 in - count:=!count + 1; - while (!cont) do - try - let curline = input_line process_inchan in - fprintf fifo_outchan "%s\n" curline;flush fifo_outchan - with - | End_of_file|Sys_blocked_io|Unix_error(EPIPE,_,_)|Unix_error(EBADF,_,_) -> - begin - cont:=false - end - | Unix_error(_,s1,s2) -> printf "Unix error %s - %s\n" s1 s2;flush Pervasives.stdout;cont:=false - | e -> printf "Error - received unexpected event from file system !!!\n";raise e - done - | _ -> printf "Bug! Process fd received in the channel handler\n";raise Bug - + | Fifo(fifo_outfd) -> + begin + try + printf "Received process event\n";flush Pervasives.stdout; + (* transferred = 4096 => there were at least 4096 bytes in the + * stream, so we should try again. + * transferred < 4096 => EAGAIN => either this is all the data we + * have (move on) + * OR XXX the receiver is blocking (supposedly this can't happen) *) + let transferred = ref 4096 in + while (!transferred == 4096) do + transferred:=tee ifd fifo_outfd 4096 + done; + with + Failure(s)->logprint "Transfer failure: %s\n" s + end + | _ -> logprint "Bug! Process fd received in the channel handler\n";raise Bug let rec openentry_int fifoin fifoout (abspath:string*string) = let fdin = try openfile fifoin [O_RDONLY;O_NONBLOCK] 0o777 with - e->printf "Error opening and connecting FIFO: %s,%o\n" fifoin 0o777;flush Pervasives.stdout;raise e + e->logprint "Error opening and connecting FIFO: %s,%o\n" fifoin 0o777;raise e in Hashtbl.replace fdmap fdin abspath; Fdwatcher.add_fd (Some(fifoin),fdin) (Some(fifoout),stdout) receive_fifo_event @@ -58,82 +63,106 @@ and reopenentry_int fdin fifoin fifoout = close fdin; Fdwatcher.del_fd fdin; let abspath = try - Hashtbl.find fdmap fdin with _ -> printf "Bug: Phantom pipe\n";flush Pervasives.stdout;raise Bug + Hashtbl.find fdmap fdin with _ -> logprint "Bug: Phantom pipe\n";raise Bug in openentry_int fifoin fifoout abspath + +(** receive an event from a fifo and connect to the corresponding service, or to + create it if it doesn't exit + @param eventdescriptor Name of input pipe,in descriptor + @param outdescriptor Name of output pipe, out descriptor + *) and receive_fifo_event eventdescriptor outdescriptor = - let evfname,evfd = eventdescriptor in + let (evfname,evfd) = eventdescriptor in let (fname_other,fd_other) = outdescriptor in + (* Open the output pipe, or use stdout instead *) let outfd = match (fname_other) with | Some(str)-> ( try openfile str [O_WRONLY;O_NONBLOCK] 0o777 with - _->printf "Output pipe not open, using stdout in place of %s\n" str;flush Pervasives.stdout;stdout + _->logprint "Output pipe not open, using stdout in place of %s\n" str;stdout ) - | None-> printf "Bug, nameless pipe\n";raise Bug + | None-> logprint "Bug, nameless pipe\n";raise Bug in + (* Check if the input descriptor is already registered (=> a session is open). + If not, register it and start a new session.*) let pipe = try Hashtbl.find open_fds evfd with | Not_found -> - (* This is a fifo fd for sure *) + (* Ok, need to launch script *) let execpath,slice_name = Hashtbl.find fdmap evfd in - (* Spawn server. We assume that the fd is one fifo opened RW *) - let (myinfd,pout) = Unix.pipe () in - let (pin,myoutfd) = Unix.pipe () in - set_nonblock myinfd; - let pid = try create_process execpath [|execpath;slice_name|] pin pout pout with e -> printf "Error executing service: %s\n" execpath;flush Pervasives.stdout;raise e + let (script_infd,pout) = Unix.pipe () in + let (pin,script_outfd) = Unix.pipe () in + set_nonblock script_infd; + ignore(sigprocmask SIG_BLOCK [Sys.sigchld]); + let rpid = try Some(create_process execpath [|execpath;slice_name|] pin pout pout) with e -> logprint "Error executing service: %s\n" execpath;None in - Hashtbl.add pidmap pid (Infd(myinfd),Outfd(myoutfd),evfd); - Hashtbl.add open_fds evfd (Process(out_channel_of_descr myoutfd)); - Hashtbl.add open_fds myinfd (Fifo(out_channel_of_descr outfd)); - Fdwatcher.add_fd (None,myinfd) (None,myinfd) receive_process_event; - (Process(out_channel_of_descr myoutfd)) + match rpid with + | None-> BrokenPipe + | Some(pid)-> + (* Register fds associated with pid so that they can be cleaned up + * when it dies *) + Hashtbl.add pidmap pid [Infd(script_infd);Outfd(script_outfd);Eventfd(evfd)]; + + (* Connect pipe to running script *) + Hashtbl.add open_fds evfd (Process(script_outfd)); + + (* Connect the running script to the pipe *) + Hashtbl.add open_fds script_infd (Fifo(outfd)); + + (* Activate running script *) + Fdwatcher.add_fd (None,script_infd) (None,script_infd) receive_process_event; + + (Process(script_outfd)) in - let inchan_fd = in_channel_of_descr evfd in + (* We have the connection to the process - because it was open, or because it + just got established *) match (pipe) with - | Process(out_channel) -> - let cont = ref true in - while (!cont) do - try - printf "Reading...\n";flush Pervasives.stdout; - let curline = input_line inchan_fd in - fprintf out_channel "%s\n" curline;flush out_channel - with - |End_of_file-> - ( - match (evfname,fname_other) with - | Some(str1),Some(str2)-> - reopenentry_int evfd str1 str2 - | Some(str1),None -> - printf "Bug, nameless pipe\n";flush Pervasives.stdout;raise Bug - | None,_ -> - printf "Race condition -> user deleted file before closing it. Clever ploy, but won't work.\n"; - flush Pervasives.stdout - ); - cont:=false - |Sys_blocked_io ->printf "Sysblockedio\n";flush Pervasives.stdout; - cont:=false - | _ ->printf "Bug: unhandled exception\n";flush Pervasives.stdout;raise Bug - done - | _ -> printf "BUG! received process event from fifo\n";raise Bug - - -let mkentry fqp abspath perm = - printf "Making entry %s->%s\n" fqp abspath;flush Pervasives.stdout; + | Process(fifo_outfd) -> + begin + try + let transferred = ref 4096 in + while (!transferred == 4096) do + begin + transferred:=tee evfd fifo_outfd 4096; + printf "Transferred: %d\n" !transferred;flush Pervasives.stdout + end + done; + with Failure(str) -> + begin + logprint "Error connecting user to service: %s\n" str + end; + ignore(sigprocmask SIG_UNBLOCK [Sys.sigchld]); + printf "Out of the loop\n";flush Pervasives.stdout + + end + | BrokenPipe -> () + | Fifo(_) -> logprint "BUG! received process event from fifo\n";raise Bug + + +(** Make a pair of fifo entries *) +let mkentry fqp abspath perm uname = + logprint "Making entry %s->%s\n" fqp abspath; let fifoin=sprintf "%s.in" fqp in let fifoout=sprintf "%s.out" fqp in (try Unix.unlink fifoin with _ -> ()); (try Unix.unlink fifoout with _ -> ()); (try - Unix.mkfifo (sprintf "%s.in" fqp) 0o666 - with - e->printf "Error creating FIFO: %s->%s,%o\n" fqp fifoin perm;flush Pervasives.stdout;raise e); - (try - Unix.mkfifo (sprintf "%s.out" fqp) 0o666 + let infname =(sprintf "%s.in" fqp) in + let outfname =(sprintf "%s.out" fqp) in + Unix.mkfifo infname 0o666; + Unix.mkfifo outfname 0o666; + ( (* Make the user the owner of the pipes in a non-chroot environment *) + if (!Globals.nochroot) then + let pwentry = Unix.getpwnam uname in + Unix.chown infname pwentry.pw_uid pwentry.pw_gid; + Unix.chown outfname pwentry.pw_uid pwentry.pw_gid + ); + Success with - e->printf "Error creating FIFO: %s->%s,%o\n" fqp fifoout perm;flush Pervasives.stdout;raise e) + e->logprint "Error creating FIFO: %s->%s. May be something wrong at the frontend.\n" fqp fifoout;Failed) -(** Open fifos for a session *) +(** Open fifos for a session. Will shutdown vsys if the fifos don't exist *) let openentry fqp abspath perm = let fifoin = String.concat "." [fqp;"in"] in let fifoout = String.concat "." [fqp;"out"] in @@ -142,16 +171,19 @@ let openentry fqp abspath perm = let sigchld_handle s = let pid,_=Unix.waitpid [Unix.WNOHANG] 0 in try - let value = Hashtbl.find pidmap pid in - match value with - | (Infd(ifd),Outfd(ofd),fd) -> - close(ifd);close(ofd); - Hashtbl.remove open_fds fd; - Fdwatcher.del_fd ifd; - Hashtbl.remove pidmap pid - | _ -> printf "BUG! Got fds in the wrong order\n"; - flush Pervasives.stdout; - raise Bug + let sfd_list = Hashtbl.find pidmap pid in + let handle_sfd sfd = + match sfd with + | Infd(fd) -> + close fd; + Fdwatcher.del_fd fd + | Outfd(fd)-> + close fd + | Eventfd(fd)-> + Hashtbl.remove open_fds fd (* Disconnect pipe *) + in + List.iter handle_sfd sfd_list; + Hashtbl.remove pidmap pid with Not_found-> (* Do nothing, probably a grandchild *) ()