vsys. first checkin
[vsys.git] / fifowatcher.ml
1 open Inotify
2 open Unix
3 open Globals
4 open Dirwatcher
5 open Printf
6
7 type channel_pipe = Process of out_channel | Fifo of out_channel 
8 type signed_fd = Infd of Unix.file_descr | Outfd of Unix.file_descr
9
10 (* (1) fdmap maps fifo fds -> service names, for initial read events
11  * (2) pidmap maps pids of currently executing services into fds for sigchlds to
12  *      prevent the appearence of zombies and to define the end of a session.
13  * (3) open_fds maps read events into active services
14  *)
15 let fdmap: (Unix.file_descr,string*string) Hashtbl.t = Hashtbl.create 1024
16 let pidmap: (int,signed_fd*signed_fd*Unix.file_descr) Hashtbl.t = Hashtbl.create 1024
17 let backend_prefix = ref ""
18 let open_fds: (Unix.file_descr,channel_pipe) Hashtbl.t = Hashtbl.create 1024
19
20
21
22 let receive_process_event (idesc:fd_and_fname) (_:fd_and_fname) =
23   printf "Process event\n";flush Pervasives.stdout;
24   let (_,ifd) = idesc in
25   let cp = try Hashtbl.find open_fds ifd with
26       Not_found->
27         printf "Fifo fd disappeared\n";raise Bug
28   in
29     match (cp) with 
30       | Fifo(fifo_outchan) ->
31           let process_inchan = in_channel_of_descr ifd in
32           let cont = ref true in
33           let count = ref 0 in
34             count:=!count + 1;
35             while (!cont) do
36               try 
37                 let curline = input_line process_inchan in
38                   printf "Here: %d %s\n" !count curline;flush Pervasives.stdout;
39                   fprintf fifo_outchan "%s\n" curline;flush fifo_outchan
40               with 
41                 | End_of_file|Sys_blocked_io|Unix_error(EPIPE,_,_) ->
42                     begin
43                       cont:=false
44                     end
45                 | Unix_error(_,s1,s2) -> printf "Unix error %s - %s\n" s1 s2;flush Pervasives.stdout;cont:=false
46                 | e -> printf "Error!!!\n";raise e
47             done
48       | _ -> printf "Bug! Process fd received in the channel handler\n";raise Bug
49
50
51 let rec openentry_int fifoin fifoout (abspath:string*string) =
52   let fdin =
53     try openfile fifoin [O_RDONLY;O_NONBLOCK] 0o777 with 
54         e->printf "Error opening and connecting FIFO: %s,%o\n" fifoin 0o777;flush Pervasives.stdout;raise e
55   in
56     Hashtbl.replace fdmap fdin abspath;
57     Fdwatcher.add_fd (Some(fifoin),fdin) (Some(fifoout),stdout) receive_fifo_event
58 and reopenentry_int fdin fifoin fifoout =
59   close fdin;
60     Fdwatcher.del_fd fdin;
61     let abspath = try 
62       Hashtbl.find fdmap fdin with _ -> printf "Bug: Phantom pipe\n";flush Pervasives.stdout;raise Bug
63     in
64       openentry_int fifoin fifoout abspath
65 and receive_fifo_event eventdescriptor outdescriptor =
66   let evfname,evfd = eventdescriptor in
67   let (fname_other,fd_other) = outdescriptor in
68   let outfd =
69     match (fname_other) with
70       | Some(str)->
71           (
72             try openfile str [O_WRONLY;O_NONBLOCK] 0o777 with
73                 _->printf "Problemo:%s\n" str;flush Pervasives.stdout;stdout
74           )
75       | None-> printf "Bug, nameless pipe\n";raise Bug
76   in
77   let pipe = try Hashtbl.find open_fds evfd with
78     | Not_found ->
79         (* This is a fifo fd for sure *)
80         let execpath,slice_name = Hashtbl.find fdmap evfd in
81         (* Spawn server. We assume that the fd is one fifo opened RW *)
82         let (myinfd,pout) = Unix.pipe () in
83         let (pin,myoutfd) = Unix.pipe () in
84           set_nonblock myinfd;
85           let pid = try create_process execpath [|execpath;slice_name|] pin pout pout with e -> printf "Error executing service: %s\n" execpath;flush Pervasives.stdout;raise e
86           in
87             Hashtbl.add pidmap pid (Infd(myinfd),Outfd(myoutfd),evfd);
88             Hashtbl.add open_fds evfd (Process(out_channel_of_descr myoutfd));
89             Hashtbl.add open_fds myinfd (Fifo(out_channel_of_descr outfd));
90             Fdwatcher.add_fd (None,myinfd) (None,myinfd) receive_process_event;
91             (Process(out_channel_of_descr myoutfd))
92   in
93   let inchan_fd = in_channel_of_descr evfd in
94     match (pipe) with
95       | Process(out_channel) -> 
96           let cont = ref true in
97             while (!cont) do
98               try 
99                 printf "Reading...\n";flush Pervasives.stdout;
100                 let curline = input_line inchan_fd in
101                   fprintf out_channel "%s\n" curline;flush out_channel 
102               with 
103                 |End_of_file->
104                     (
105                       match (evfname,fname_other) with
106                         | Some(str1),Some(str2)->
107                             reopenentry_int evfd str1 str2
108                         | Some(str1),None ->
109                             printf "Bug, nameless pipe\n";flush Pervasives.stdout;raise Bug
110                         | None,_ ->
111                             printf "Race condition -> user deleted file before closing it. Clever ploy, but won't work.\n";
112                             flush Pervasives.stdout
113                     );
114                       cont:=false
115                 |Sys_blocked_io ->printf "Sysblockedio\n";flush Pervasives.stdout;
116                                   cont:=false
117                 | _ ->printf "Bug: unhandled exception\n";flush Pervasives.stdout;raise Bug
118             done
119       | _ -> printf "BUG! received process event from fifo\n";raise Bug
120
121
122 let mkentry fqp abspath perm = 
123   printf "Making entry %s->%s\n" fqp abspath;flush Pervasives.stdout;
124   let fifoin=sprintf "%s.in" fqp in
125   let fifoout=sprintf "%s.out" fqp in
126     (try Unix.unlink fifoin with _ -> ());
127     (try Unix.unlink fifoout with _ -> ());
128     (try 
129        Unix.mkfifo (sprintf "%s.in" fqp) 0o666
130      with 
131          e->printf "Error creating FIFO: %s->%s,%o\n" fqp fifoin perm;flush Pervasives.stdout;raise e);
132     (try 
133        Unix.mkfifo (sprintf "%s.out" fqp) 0o666
134      with 
135          e->printf "Error creating FIFO: %s->%s,%o\n" fqp fifoout perm;flush Pervasives.stdout;raise e)
136
137
138 let openentry fqp abspath perm =
139   let fifoin = String.concat "." [fqp;"in"] in
140   let fifoout = String.concat "." [fqp;"out"] in
141     openentry_int fifoin fifoout abspath
142
143 let sigchld_handle s =
144   let pid,_=Unix.waitpid [Unix.WNOHANG] 0 in
145     try
146       let value = Hashtbl.find pidmap pid in
147         match value with
148           | (Infd(ifd),Outfd(ofd),fd) ->
149               close(ifd);close(ofd);
150               Hashtbl.remove open_fds fd;
151               Fdwatcher.del_fd ifd;
152               Hashtbl.remove pidmap pid
153           | _ -> printf "BUG! Got fds in the wrong order\n";
154                  flush Pervasives.stdout;
155                  raise Bug
156     with 
157         Not_found-> (* Do nothing, probably a grandchild *)
158           ()
159
160
161
162 let initialize () = 
163   Sys.set_signal Sys.sigchld (Sys.Signal_handle sigchld_handle)