/pliant/language/stream/tcp.pli
 
 1  # Copyright  Hubert Tonneau  hubert.tonneau@pliant.cx 
 2  # 
 3  # This program is free software; you can redistribute it and/or 
 4  # modify it under the terms of the GNU General Public License version 2 
 5  # as published by the Free Software Foundation. 
 6  # 
 7  # This program is distributed in the hope that it will be useful, 
 8  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 9  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 10  # GNU General Public License for more details. 
 11  # 
 12  # You should have received a copy of the GNU General Public License 
 13  # version 2 along with this program; if not, write to the Free Software 
 14  # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. 
 15   
 16  module "ring.pli" 
 17  module "/pliant/language/os/socket.pli" 
 18   
 19  constant trace false 
 20  constant debug false 
 21  constant reuse true 
 22  constant default_queue_length 16 
 23  constant changesettings true 
 24  constant waitminimaldelay 0.001 
 25   
 26   
 27 
 
 28   
 29   
 30  type TcpStreamDriver 
 31    field Int s 
 32    field Int cs 
 33    field Float timeout 
 34    field CBool closeconnectionsocket <- false 
 35  StreamDriver maybe TcpStreamDriver 
 36   
 37   
 38  method drv read buf mini maxi -> red 
 39    arg_rw TcpStreamDriver drv ; arg Address buf ; arg Int mini maxi red 
 40    if drv:timeout=defined 
 41      if (os_socket_wait drv:in drv:timeout)=failure 
 42        return 0 
 43    if os_usesendrecv 
 44      red := os_recv drv:s buf maxi 0 
 45    else 
 46      red := os_read drv:buf maxi 
 47    red := max red 0 
 48   
 49   
 50  method drv write buf mini maxi -> written 
 51    arg_rw TcpStreamDriver drv ; arg Address buf ; arg Int mini maxi written 
 52    if drv:timeout=defined 
 53      if (os_socket_wait drv:out drv:timeout)=failure 
 54        return 0 
 55    if os_usesendrecv 
 56      written := os_send drv:s buf maxi 0 
 57    else 
 58      written := os_write drv:buf maxi 
 59    written := max written 0 
 60   
 61   
 62  method drv flush level -> status 
 63    arg_rw TcpStreamDriver drv ; arg Int level ; arg Status status 
 64    if level=end 
 65      return success 
 66    if os_api="linux" or os_api="posix" 
 67      if level<async 
 68        status := success 
 69      eif level<sync 
 70        status := shunt (os_write drv:null 0)=0 success failure 
 71      else 
 72        status := shunt (os_fsync drv:s)=0 success failure 
 73    else 
 74      status := success 
 75   
 76   
 77  method drv close -> status 
 78    arg_rw TcpStreamDriver drv ; arg ExtendedStatus status 
 79    if trace 
 80      console "closing socket "+(string drv:s)+"[lf]" 
 81    status := shunt (os_close drv:s)=0 success failure 
 82    if drv:closeconnectionsocket 
 83      os_close drv:cs 
 84   
 85   
 86  method drv query command stream answer -> status 
 87    arg_rw TcpStreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg_w Str answer ; arg ExtendedStatus status 
 88    if command="local_ip_address" 
 89      var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size 
 90      if (os_getsockname (shunt drv:s>=drv:drv:cs) addr addrlen)<>0 
 91        return failure 
 92      answer := ip_dot_notation addr:sin_addr 
 93      status := success 
 94    eif command="local_ip_port" 
 95      var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size 
 96      if (os_getsockname (shunt drv:s>=drv:drv:cs) addr addrlen)<>0 
 97        return failure 
 98      answer := 'convert to string' addr:sin_port 
 99      status := success 
 100    eif command="remote_ip_address" 
 101      var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size 
 102      if (os_getpeername drv:addr addrlen)<>0 
 103        return failure 
 104      answer := ip_dot_notation addr:sin_addr 
 105      status := success 
 106    eif command="remote_ip_port" 
 107      var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size 
 108      if (os_getpeername drv:addr addrlen)<>0 
 109        return failure 
 110      answer := 'convert to string' addr:sin_port 
 111      status := success 
 112    eif false # uselibcfornames and command="remote_name" 
 113      var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size 
 114      if (os_getpeername drv:s addr addrlen)<>0 
 115        return failure 
 116      var Pointer:hostent host :> gethostbyaddr (addressof addr:sin_addr) uInt:size AF_INET 
 117      if addressof:host=null 
 118        return failure 
 119      var Int length :=  (cast (memory_search host:h_name 1000 "[0]":characters 1) Int).-.(cast host:h_name Int) 
 120      var Address buffer := memory_allocate length addressof:answer 
 121      memory_copy host:h_name buffer length 
 122      answer set buffer length true 
 123      status := success 
 124    eif command="connection_handle" 
 125      answer := 'convert to string' drv:cs 
 126      status := success 
 127    else 
 128      status := failure 
 129   
 130  method drv configure command stream -> status 
 131    arg_rw TcpStreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg ExtendedStatus status 
 132    if command="connect" 
 133      if drv:s>=0 
 134        return failure 
 135      var os_sockaddr_in addr 
 136      var Int addrlen := os_sockaddr_in size 
 137      drv := os_accept drv:cs addr addrlen 
 138      if drv:s<0 
 139        stream error "failed to connect the tcp server socket to a client socket" 
 140        return failure 
 141      else 
 142        return success 
 143    eif (command parse word:"timeout" drv:timeout) 
 144      # os_fcntl drv:s os_F_SETFL (shunt drv:timeout=defined os_O_NONBLOCK 0) 
 145      status := success 
 146    eif command="shutdown" 
 147      status := shunt (os_shutdown drv:2)=0 success failure 
 148    eif (command parse word:"priority" any:(var Str param)) 
 149      if os_api="linux" 
 150        var Int tos := shunt param="high" os_IPTOS_LOWDELAY param="low" os_IPTOS_LOWCOST 0 
 151        os_setsockopt drv:os_IPPROTO_IP os_IP_TOS addressof:tos Int:size 
 152      status := success 
 153    else 
 154      status := failure 
 155   
 156   
 157 
 
 158   
 159   
 160  (gvar Relation connection_sockets) flags := 4 
 161  gvar Sem connection_sem 
 162  gvar CBool connection_down := false 
 163   
 164  function tcp_server_sockets_off 
 165    connection_sem request 
 166    each connection_sockets 
 167      var Int cs := map Int 
 168      os_shutdown cs 2 
 169      os_close cs 
 170    connection_sockets := var Relation empty_relation 
 171    connection_sockets flags := 4 
 172    connection_down := true 
 173    connection_sem release 
 174   
 175  function tcp_server_sockets_on 
 176    connection_down := false 
 177   
 178  export tcp_server_sockets_off tcp_server_sockets_on 
 179   
 180   
 181  type TcpServerFileSystem 
 182    void 
 183  FileSystem maybe TcpServerFileSystem 
 184   
 185   
 186  method fs open name options flags stream support -> status 
 187    arg_rw TcpServerFileSystem fs ; arg Str name options ; arg Int flags ; arg_rw Stream stream support ; arg ExtendedStatus status 
 188    var Int port ; var CBool close 
 189    if (name eparse "/server/" port) 
 190      close := options option "close" 
 191    eif (name eparse "/server/any") 
 192      port := 0 ; close := true 
 193    else 
 194      if debug 
 195        console "invalid tcp server name: "+name+"[lf]" 
 196      return failure 
 197    connection_sem request 
 198    if connection_down 
 199      connection_sem release 
 200      os_yield 
 201      return failure 
 202    var Pointer:Int pcs :> (connection_sockets query (cast port Address) null) map Int 
 203    var Int cs 
 204    if addressof:pcs<>null 
 205      cs := pcs 
 206      if trace 
 207        console "reuse socket "+string:cs+" for port "+string:port+"[lf]" 
 208      close := false 
 209      var Int queue := options option "queue" Int 
 210      if queue=undefined 
 211        queue := default_queue_length 
 212      os_listen cs queue 
 213    else 
 214      stream_lock_handle 
 215      cs := os_socket os_AF_INET os_SOCK_STREAM 0 
 216      stream_unlock_handle cs 
 217      if cs<0 
 218        if debug 
 219          console "failed to create server socket[lf]" 
 220        connection_sem release 
 221        return failure 
 222      var CBool ok := true 
 223      if changesettings 
 224        if reuse 
 225          var Int optvalue1 := 1 
 226          if (os_setsockopt cs os_SOL_SOCKET os_SO_REUSEADDR addressof:optvalue1 Int:size)<>0 
 227            if debug 
 228              console "failed set server socket os_SO_REUSEADDR attribute[lf]" 
 229            ok := false 
 230        var Int optvalue2 := 1 
 231        if ok and (os_setsockopt cs os_SOL_SOCKET os_SO_KEEPALIVE addressof:optvalue2 Int:size)<>0 
 232          if debug 
 233            console "failed set server socket os_SO_KEEPALIVE attribute[lf]" 
 234          ok := false 
 235      var os_sockaddr_in addr ; var Int addrlen 
 236      addr sin_family := os_AF_INET 
 237      addr sin_addr := 0 
 238      addr sin_port := port 
 239      if ok and (os_bind cs addr os_sockaddr_in:size)<>0 
 240        if debug 
 241          console "failed bind server socket[lf]" 
 242        ok := false 
 243      if ok and port=0 
 244        addrlen := os_sockaddr_in:size 
 245        if (os_getsockname cs addr addrlen)<>0 
 246          ok := false 
 247        port := addr sin_port 
 248      var Int queue := options option "queue" Int 
 249      if queue=undefined 
 250        queue := default_queue_length 
 251      if ok and (os_listen cs queue)<>0 
 252        if debug 
 253          console "failed listen server socket[lf]" 
 254        ok := false 
 255      if not ok 
 256        os_close cs 
 257        connection_sem release 
 258        return failure 
 259      if trace 
 260        console "create connection socket "+string:cs+" for port "+string:port+"[lf]" 
 261      if not close 
 262        pcs :> new Int cs 
 263        connection_sockets define (cast port Address) null addressof:pcs 
 264    connection_sem release 
 265    var Int := -1 
 266    if not (options option "noautoconnect") 
 267      addrlen := os_sockaddr_in size 
 268      stream_lock_handle 
 269      := os_accept cs addr addrlen 
 270      stream_unlock_handle s 
 271      if s<0 
 272        if close 
 273          os_close cs 
 274        if debug 
 275          console "failed to accept client socket[lf]" 
 276        return failure 
 277    var Link:TcpStreamDriver drv :> new TcpStreamDriver 
 278    drv := s 
 279    drv cs := cs 
 280    drv timeout := undefined 
 281    drv closeconnectionsocket := close 
 282    stream stream_driver :> drv 
 283    stream stream_handle := s 
 284    if trace 
 285      console "opening socket "+(string drv:s)+"[lf]" 
 286    status := success 
 287   
 288   
 289 
 
 290   
 291   
 292  type TcpClientFileSystem 
 293    void 
 294  FileSystem maybe TcpClientFileSystem 
 295   
 296   
 297  method fs open name options flags stream support -> status 
 298    arg_rw TcpClientFileSystem fs ; arg Str name options ; arg Int flags ; arg_rw Stream stream support ; arg ExtendedStatus status 
 299    var uInt ipaddress ; var Int port 
 300    if (name eparse "//" (var Int i1) "." (var Int i2) "." (var Int i3) "." (var Int i4) "/client/" port) 
 301      ipaddress := (cast i1 uInt)*256^+ (cast i2 uInt)*256^+ (cast i3 uInt)*256 + (cast i4 uInt) 
 302    eif (name eparse "//" any:(var Str hostname) "/client/" port) 
 303      var Str ip := dns_query_prototype hostname dns_query_function 
 304      if (ip parse (var Int i1) "." (var Int i2) "." (var Int i3) "." (var Int i4)) 
 305        ipaddress := (cast i1 uInt)*256^+ (cast i2 uInt)*256^+ (cast i3 uInt)*256 + (cast i4 uInt) 
 306      else 
 307        if debug 
 308          console "failed to get ip address of "+hostname+"[lf]" 
 309        return failure 
 310      if trace 
 311        console name+" ip address is "+(string ipaddress "radix 16")+"[lf]" 
 312    eif (name eparse "/client/" port) 
 313      ipaddress := 127*256^3+1 
 314    else 
 315      if debug 
 316        console "invalid tcp client name: "+name+"[lf]" 
 317      return failure 
 318    stream_lock_handle 
 319    var Int := os_socket os_AF_INET os_SOCK_STREAM 0 
 320    stream_unlock_handle s 
 321    if s<0 
 322      if debug 
 323        console "failed to create the TCP client socket[lf]" 
 324      return failure 
 325    if changesettings 
 326      var Int optvalue := 1 
 327      if (os_setsockopt os_SOL_SOCKET os_SO_KEEPALIVE addressof:optvalue Int:size)<>0 
 328        if debug 
 329          console "failed set client socket os_SO_KEEPALIVE attribute[lf]" 
 330        os_close s 
 331        return failure 
 332    var os_sockaddr_in addr 
 333    addr sin_family := os_AF_INET 
 334    addr sin_addr := ipaddress 
 335    addr sin_port := port 
 336    var Float timeout := options option "timeout" Float 
 337    if os_api="linux" and timeout<>undefined 
 338      os_fcntl os_F_SETFL os_O_NONBLOCK 
 339      if (os_connect addr os_sockaddr_in:size)<>0 
 340        os_socket_wait out timeout 
 341        if (os_connect addr os_sockaddr_in:size)<>0 
 342          if debug 
 343            console "failed connect TCP client socket to "+name+" (timeout "+string:timeout+")[lf]" 
 344          os_close s 
 345          return failure 
 346      os_fcntl os_F_SETFL 0 
 347    else 
 348      if (os_connect addr os_sockaddr_in:size)<>0 
 349        if debug 
 350          console "failed connect TCP client socket to "+name+"[lf]" 
 351        os_close s 
 352        return failure 
 353    var Link:TcpStreamDriver drv :> new TcpStreamDriver 
 354    drv := s 
 355    drv cs := -1 
 356    drv timeout := timeout 
 357    stream stream_driver :> drv 
 358    stream stream_handle := s 
 359    status := success 
 360   
 361   
 362 
 
 363   
 364   
 365  if os_api="linux" or os_api="posix" 
 366    gvar os_sigaction sa 
 367    sa sa_handler := cast os_SIG_IGN Address 
 368    function record_exception_handler parameter filehandle 
 369      arg Address parameter ; arg Int filehandle 
 370      if (os_sigaction os_SIGPIPE sa (null map os_sigaction))<>0 
 371        error error_id_os "Failed to install Linux pliant_signal exception handler" 
 372    record_exception_handler null 0 
 373    gvar DelayedAction da 
 374    da function :> the_function record_exception_handler Address Int 
 375    pliant_restore_actions append addressof:da 
 376   
 377   
 378 
 
 379   
 380   
 381  gvar TcpClientFileSystem tcp_client_file_system 
 382  pliant_multi_file_system mount "tcp:" "" tcp_client_file_system 
 383   
 384  gvar TcpServerFileSystem tcp_server_file_system 
 385  pliant_multi_file_system mount "tcp:/server/" "/server/" tcp_server_file_system