Patch title: Release 93 bulk changes
Abstract:
File: /language/stream/tcp.pli
Key:
    Removed line
    Added line
# Copyright  Hubert Tonneau  hubert.tonneau@pliant.cx
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 2
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# version 2 along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.

module "ring.pli"
module "/pliant/language/os/socket.pli"

constant trace false
constant debug false
constant reuse true
constant default_queue_length 16
constant changesettings true
constant waitminimaldelay 0.001


#----------------------------------------------------------------


type TcpStreamDriver
  field Int s
  field Int cs
  field Float timeout
  field CBool closeconnectionsocket <- false
StreamDriver maybe TcpStreamDriver


method drv read buf mini maxi -> red
  arg_rw TcpStreamDriver drv ; arg Address buf ; arg Int mini maxi red
  if drv:timeout=defined
    if (os_socket_wait drv:s in drv:timeout)=failure
      return 0
  if os_usesendrecv
    red := os_recv drv:s buf maxi 0
  else
    red := os_read drv:s buf maxi
  red := max red 0


method drv write buf mini maxi -> written
  arg_rw TcpStreamDriver drv ; arg Address buf ; arg Int mini maxi written
  if drv:timeout=defined
    if (os_socket_wait drv:s out drv:timeout)=failure
      return 0
  if os_usesendrecv
    written := os_send drv:s buf maxi 0
  else
    written := os_write drv:s buf maxi
  written := max written 0


method drv flush level -> status
  arg_rw TcpStreamDriver drv ; arg Int level ; arg Status status
  if level=end
    return success
  if os_api="linux" or os_api="posix"
    if level<async
      status := success
    eif level<sync
      status := shunt (os_write drv:s null 0)=0 success failure
    else
      status := shunt (os_fsync drv:s)=0 success failure
  else
    status := success


method drv close -> status
  arg_rw TcpStreamDriver drv ; arg ExtendedStatus status
  if trace
    console "closing socket "+(string drv:s)+"[lf]"
  status := shunt (os_close drv:s)=0 success failure
  if drv:closeconnectionsocket
    os_close drv:cs


method drv query command stream answer -> status
  arg_rw TcpStreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg_w Str answer ; arg ExtendedStatus status
  if command="local_ip_address"
    var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size
    if (os_getsockname (shunt drv:s>=0 drv:s drv:cs) addr addrlen)<>0
      return failure
    answer := ip_dot_notation addr:sin_addr
    status := success
  eif command="local_ip_port"
    var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size
    if (os_getsockname (shunt drv:s>=0 drv:s drv:cs) addr addrlen)<>0
      return failure
    answer := 'convert to string' addr:sin_port
    status := success
  eif command="remote_ip_address"
    var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size
    if (os_getpeername drv:s addr addrlen)<>0
      return failure
    answer := ip_dot_notation addr:sin_addr
    status := success
  eif command="remote_ip_port"
    var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size
    if (os_getpeername drv:s addr addrlen)<>0
      return failure
    answer := 'convert to string' addr:sin_port
    status := success
  eif false # uselibcfornames and command="remote_name"
    var os_sockaddr_in addr ; var Int addrlen := os_sockaddr_in:size
    if (os_getpeername drv:s addr addrlen)<>0
      return failure
    var Pointer:hostent host :> gethostbyaddr (addressof addr:sin_addr) uInt:size AF_INET
    if addressof:host=null
      return failure
    var Int length :=  (cast (memory_search host:h_name 1000 "[0]":characters 1) Int).-.(cast host:h_name Int)
    var Address buffer := memory_allocate length addressof:answer
    memory_copy host:h_name buffer length
    answer set buffer length true
    status := success
  eif command="connection_handle"
    answer := 'convert to string' drv:cs
    status := success
  else
    status := failure

method drv configure command stream -> status
  arg_rw TcpStreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg ExtendedStatus status
  if command="connect"
    if drv:s>=0
      return failure
    var os_sockaddr_in addr
    var Int addrlen := os_sockaddr_in size
    drv s := os_accept drv:cs addr addrlen
    if drv:s<0
      stream error "failed to connect the tcp server socket to a client socket"
      return failure
    else
      return success
  eif (command parse word:"timeout" drv:timeout)
    # os_fcntl drv:s os_F_SETFL (shunt drv:timeout=defined os_O_NONBLOCK 0)
    status := success
  eif command="shutdown"
    status := shunt (os_shutdown drv:s 2)=0 success failure
  eif (command parse word:"priority" any:(var Str param))
    if os_api="linux"
      var Int tos := shunt param="high" os_IPTOS_LOWDELAY param="low" os_IPTOS_LOWCOST 0
      os_setsockopt drv:s os_IPPROTO_IP os_IP_TOS addressof:tos Int:size
    status := success
  else
    status := failure


#----------------------------------------------------------------


(gvar Relation connection_sockets) flags := 4
gvar Sem connection_sem
gvar CBool connection_down := false

function tcp_server_sockets_off
  connection_sem request
  each a connection_sockets
    var Int cs := a map Int
    os_shutdown cs 2
    os_close cs
  connection_sockets := var Relation empty_relation
  connection_sockets flags := 4
  connection_down := true
  connection_sem release

function tcp_server_sockets_on
  connection_down := false

export tcp_server_sockets_off tcp_server_sockets_on


type TcpServerFileSystem
  void
FileSystem maybe TcpServerFileSystem


method fs open name options flags stream support -> status
  arg_rw TcpServerFileSystem fs ; arg Str name options ; arg Int flags ; arg_rw Stream stream support ; arg ExtendedStatus status
  var Int port ; var CBool close
  if (name eparse "/server/" port)
    close := options option "close"
  eif (name eparse "/server/any")
    port := 0 ; close := true
  else
    if debug
      console "invalid tcp server name: "+name+"[lf]"
    return failure
  connection_sem request
  if connection_down
    connection_sem release
    os_yield
    return failure
  var Pointer:Int pcs :> (connection_sockets query (cast port Address) null) map Int
  var Int cs
  if addressof:pcs<>null
    cs := pcs
    if trace
      console "reuse socket "+string:cs+" for port "+string:port+"[lf]"
    close := false
    var Int queue := options option "queue" Int
    if queue=undefined
      queue := default_queue_length
    os_listen cs queue
  else
    stream_lock_handle
    cs := os_socket os_AF_INET os_SOCK_STREAM 0
    stream_unlock_handle cs
    if cs<0
      if debug
        console "failed to create server socket[lf]"
      connection_sem release
      return failure
    var CBool ok := true
    if changesettings
      if reuse
        var Int optvalue1 := 1
        if (os_setsockopt cs os_SOL_SOCKET os_SO_REUSEADDR addressof:optvalue1 Int:size)<>0
          if debug
            console "failed set server socket os_SO_REUSEADDR attribute[lf]"
          ok := false
      var Int optvalue2 := 1
      if ok and (os_setsockopt cs os_SOL_SOCKET os_SO_KEEPALIVE addressof:optvalue2 Int:size)<>0
        if debug
          console "failed set server socket os_SO_KEEPALIVE attribute[lf]"
        ok := false
    var os_sockaddr_in addr ; var Int addrlen
    addr sin_family := os_AF_INET
    addr sin_addr := 0
    addr sin_port := port
    if ok and (os_bind cs addr os_sockaddr_in:size)<>0
      if debug
        console "failed bind server socket[lf]"
      ok := false
    if ok and port=0
      addrlen := os_sockaddr_in:size
      if (os_getsockname cs addr addrlen)<>0
        ok := false
      port := addr sin_port
    var Int queue := options option "queue" Int
    if queue=undefined
      queue := default_queue_length
    if ok and (os_listen cs queue)<>0
      if debug
        console "failed listen server socket[lf]"
      ok := false
    if not ok
      os_close cs
      connection_sem release
      return failure
    if trace
      console "create connection socket "+string:cs+" for port "+string:port+"[lf]"
    if not close
      pcs :> new Int cs
      connection_sockets define (cast port Address) null addressof:pcs
  connection_sem release
  var Int s := -1
  if not (options option "noautoconnect")
    addrlen := os_sockaddr_in size
    stream_lock_handle
    s := os_accept cs addr addrlen
    stream_unlock_handle s
    if s<0
      if close
        os_close cs
      if debug
        console "failed to accept client socket[lf]"
      return failure
  var Link:TcpStreamDriver drv :> new TcpStreamDriver
  drv s := s
  drv cs := cs
  drv timeout := undefined
  drv closeconnectionsocket := close
  stream stream_driver :> drv
  stream stream_handle := s
  if trace
    console "opening socket "+(string drv:s)+"[lf]"
  status := success


#----------------------------------------------------------------


type TcpClientFileSystem
  void
FileSystem maybe TcpClientFileSystem


method fs open name options flags stream support -> status
  arg_rw TcpClientFileSystem fs ; arg Str name options ; arg Int flags ; arg_rw Stream stream support ; arg ExtendedStatus status
  var uInt ipaddress ; var Int port
  if (name eparse "//" (var Int i1) "." (var Int i2) "." (var Int i3) "." (var Int i4) "/client/" port)
    ipaddress := (cast i1 uInt)*256^3 + (cast i2 uInt)*256^2 + (cast i3 uInt)*256 + (cast i4 uInt)
  eif (name eparse "//" any:(var Str hostname) "/client/" port)
    var Str ip := dns_query_prototype hostname dns_query_function
    if (ip parse (var Int i1) "." (var Int i2) "." (var Int i3) "." (var Int i4))
      ipaddress := (cast i1 uInt)*256^3 + (cast i2 uInt)*256^2 + (cast i3 uInt)*256 + (cast i4 uInt)
    else
      if debug
        console "failed to get ip address of "+hostname+"[lf]"
      return failure
    if trace
      console name+" ip address is "+(string ipaddress "radix 16")+"[lf]"
  eif (name eparse "/client/" port)
    ipaddress := 127*256^3+1
  else
    if debug
      console "invalid tcp client name: "+name+"[lf]"
    return failure
  stream_lock_handle
  var Int s := os_socket os_AF_INET os_SOCK_STREAM 0
  stream_unlock_handle s
  if s<0
    if debug
      console "failed to create the TCP client socket[lf]"
    return failure
  if changesettings
    var Int optvalue := 1
    if (os_setsockopt s os_SOL_SOCKET os_SO_KEEPALIVE addressof:optvalue Int:size)<>0
      if debug
        console "failed set client socket os_SO_KEEPALIVE attribute[lf]"
      os_close s
      return failure
  var os_sockaddr_in addr
  addr sin_family := os_AF_INET
  addr sin_addr := ipaddress
  addr sin_port := port
  var Float timeout := options option "timeout" Float
  if timeout<>undefined
  if os_api="linux" and timeout<>undefined
    os_fcntl s os_F_SETFL os_O_NONBLOCK
    if (os_connect s addr os_sockaddr_in:size)<>0
      os_socket_wait s out timeout
      if (os_connect s addr os_sockaddr_in:size)<>0
        if debug
          console "failed connect TCP client socket to "+name+" (timeout "+string:timeout+")[lf]"
        os_close s
        return failure
    os_fcntl s os_F_SETFL 0
  else
    if (os_connect s addr os_sockaddr_in:size)<>0
      if debug
        console "failed connect TCP client socket to "+name+"[lf]"
      os_close s
      return failure
  var Link:TcpStreamDriver drv :> new TcpStreamDriver
  drv s := s
  drv cs := -1
  drv timeout := timeout
  stream stream_driver :> drv
  stream stream_handle := s
  status := success


#----------------------------------------------------------------


if os_api="linux" or os_api="posix"
  gvar os_sigaction sa
  sa sa_handler := cast os_SIG_IGN Address
  function record_exception_handler parameter filehandle
    arg Address parameter ; arg Int filehandle
    if (os_sigaction os_SIGPIPE sa (null map os_sigaction))<>0
      error error_id_os "Failed to install Linux pliant_signal exception handler"
  record_exception_handler null 0
  gvar DelayedAction da
  da function :> the_function record_exception_handler Address Int
  pliant_restore_actions append addressof:da


#----------------------------------------------------------------


gvar TcpClientFileSystem tcp_client_file_system
pliant_multi_file_system mount "tcp:" "" tcp_client_file_system

gvar TcpServerFileSystem tcp_server_file_system
pliant_multi_file_system mount "tcp:/server/" "/server/" tcp_server_file_system