/pliant/protocol/common/tcp_server.pli
 
 1  abstract 
 2    [This module is expected to help create various servers using TCP as a communication layer: it handles opening a connection socket, waiting for a client, then creating a new thread to deal with the client.] 
 3   
 4  # Copyright  Hubert Tonneau  hubert.tonneau@pliant.cx 
 5  # 
 6  # This program is free software; you can redistribute it and/or 
 7  # modify it under the terms of the GNU General Public License version 2 
 8  # as published by the Free Software Foundation. 
 9  # 
 10  # This program is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU General Public License 
 16  # version 2 along with this program; if not, write to the Free Software 
 17  # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. 
 18   
 19  # scope "/pliant/" 
 20  module "/pliant/language/stream.pli" 
 21  module "/pliant/language/stream/tcp.pli" 
 22  module "/pliant/language/compiler.pli" 
 23  module "/pliant/language/os.pli" 
 24  submodule "/pliant/language/data/fields.pli" 
 25   
 26  module "/pliant/language/context.pli" 
 27  module "/pliant/language/schedule/resourcesem.pli" 
 28   
 29   
 30  public 
 31    gvar ResourceSem tcp_resource 
 32  tcp_resource configure (max (min (cast memory_assigned\2^20 Int) 1024) 64) 
 33   
 34  constant quiet false 
 35  constant sequential false 
 36  constant secured_channel_support true 
 37   
 38  if secured_channel_support 
 39    module "/pliant/util/crypto/channel.pli" 
 40   
 41   
 42  named_expression declare_fields 
 43    field Str name <- n 
 44    field Array:Str channel 
 45    field Int port <- p 
 46    field CBool detached <- false 
 47    field DateTime since 
 48    field Str welcome_message 
 49    field Int ports_count <- 0 
 50    field Int clients_count <- 0 
 51    field CBool please_stop <- false 
 52    if secured_channel_support 
 53      field CBool secured <- false 
 54      field Int secured_port <- undefined 
 55   
 56  meta tcp_server_fields e 
 57    if e:size<>or (e:constant Str)=null or (e:constant Int)=null 
 58      return 
 59    compile_as (expression duplicate declare_fields substitute n e:0 substitute p e:1) 
 60   
 61   
 62  public 
 63    type TcpServer 
 64      tcp_server_fields "" 0 
 65   
 66  public 
 67    gvar List tcp_servers_list 
 68    gvar Sem tcp_servers_sem 
 69    gvar Sem tcp_shutdown_sem 
 70   
 71  method server record 
 72    arg_rw TcpServer server 
 73    entry_lock addressof:server 
 74    tcp_servers_sem request 
 75    server since := datetime 
 76    tcp_servers_list append addressof:server 
 77    tcp_servers_sem release 
 78   
 79  method server unrecord 
 80    arg_rw TcpServer server 
 81    tcp_servers_sem request 
 82    server since := undefined 
 83    var Pointer:Arrow :> tcp_servers_list first 
 84    while c<>null 
 85      if c=addressof:server 
 86        :> tcp_servers_list remove c 
 87      else 
 88        :> tcp_servers_list next c 
 89    tcp_servers_sem release 
 90    entry_unlock addressof:server 
 91   
 92   
 93  method server start_checkup -> status 
 94    arg_rw TcpServer server ; arg Status status 
 95    generic 
 96    status := success 
 97   
 98  method server stop_checkup 
 99    arg_rw TcpServer server 
 100    generic 
 101   
 102  method server service stream 
 103    arg_rw TcpServer server ; arg_rw Stream stream 
 104    generic 
 105   
 106   
 107  method server main_loop channel 
 108    oarg_rw TcpServer server ; arg Str channel 
 109    tcp_resource query (var Int current) (var Int maxi) 
 110    var Str options := "queue "+string:maxi 
 111    part connect "receive "+server:name+" connections" 
 112      while not server:please_stop 
 113        var Link:Stream :> new Stream 
 114        open channel options in+out+safe+cr+lf 
 115        if s=success 
 116          atomic_increment server:clients_count 
 117          if sequential or not (tcp_resource nowait_request 1) 
 118            part service "service "+server:name+" request from "+(safe_query "remote_ip_address") 
 119              server service s 
 120            close 
 121            atomic_add server:clients_count (-1) 
 122          else 
 123            safe 
 124              thread 
 125                part service2 "service "+server:name+" request from "+(safe_query "remote_ip_address") 
 126                  share:server service s 
 127                close 
 128                atomic_add server:clients_count (-1) 
 129                tcp_resource release 1 
 130            failure 
 131              close 
 132              atomic_add server:clients_count (-1) 
 133              tcp_resource release 1 
 134              console "threads overflow[lf]" 
 135              sleep 2 
 136    atomic_add server:ports_count -1 
 137   
 138   
 139  method server start -> status 
 140    oarg_rw TcpServer server ; arg Status status 
 141    if server:since=defined 
 142      return 
 143    if server:channel:size=0 
 144      server channel += "tcp:/server/"+(string server:port) 
 145      var Link:Stream :> new Stream 
 146      open "tcp:/client/"+(string server:port) in+out+safe 
 147      if s=success 
 148        if not quiet 
 149          compile_message "" 
 150          console "TCP port "+(string server:port)+" is used by another server." eol 
 151        return failure 
 152      open server:channel:"noautoconnect" in+out+safe 
 153      if s=failure 
 154        return failure 
 155      :> null map Stream 
 156    else 
 157      server port := undefined 
 158    if secured_channel_support 
 159      if server:secured 
 160        if server:secured_port=undefined 
 161          server:secured_port := server:port+500 
 162        server channel += "channel:/server/"+(string server:secured_port) 
 163    if (server start_checkup)=failure 
 164      return failure 
 165    server record 
 166    compile_message "" 
 167    if server:welcome_message<>"" 
 168      console server:welcome_message eol 
 169    eif not quiet 
 170      console server:name " server is running." eol 
 171    server:ports_count := server:channel:size 
 172    for (var Int i) server:channel:size-1 (shunt server:detached 0 1) step -1 
 173      thread 
 174        share:server main_loop server:channel:i 
 175    if not server:detached 
 176      server main_loop server:channel:0 
 177      tcp_shutdown_sem request ; tcp_shutdown_sem release 
 178    status := success 
 179   
 180   
 181  method server stop timeout 
 182    oarg_rw TcpServer server ; arg Float timeout 
 183    var DateTime start := datetime 
 184    server please_stop := true 
 185    server stop_checkup 
 186    for (var Int i) server:channel:size-1 0 step -1 
 187      var Str name := server:channel:i 
 188      name := replace name "/server/" "/client/" 
 189      name := replace name "channel:" "tcp:" 
 190      (var Stream s) open name in+out+safe 
 191      close 
 192    while server:ports_count<>and datetime:seconds-start:seconds<timeout 
 193      sleep 0.1 
 194    while server:clients_count<>and datetime:seconds-start:seconds<timeout 
 195      sleep 0.1 
 196    server please_stop := false 
 197    server unrecord 
 198    if not quiet 
 199      console server:name " server stopped." eol 
 200   
 201  function tcp_servers_stop timeout 
 202    arg Float timeout 
 203    var DateTime start := datetime 
 204    tcp_server_sockets_off 
 205    while tcp_servers_list:first<>null 
 206      tcp_servers_sem request 
 207      var Pointer:Arrow :> tcp_servers_list first 
 208      var Link:TcpServer server :> tcp_servers_list:first map TcpServer 
 209      tcp_servers_sem release 
 210      if addressof:server<>null 
 211        var Float elapsed := datetime:seconds-start:seconds 
 212        server stop timeout-elapsed 
 213    tcp_server_sockets_on 
 214   
 215   
 216  meta define_tcp_server e 
 217    if e:size<>or (e:constant Type)=null or e:1:ident="" 
 218      return 
 219    named_expression construct_tcp_server 
 220      meta name e 
 221        var Link:Argument adr :> argument local Address 
 222        var Link:Argument server :> argument indirect type adr 0 
 223        e add (instruction (the_function entry_new Type -> Address) (argument mapped_constant Type type) adr) 
 224        if (e parse_fields server 0 access_object)=failure 
 225          return 
 226        e add (instruction (the_function '. start' TcpServer -> Status) server (argument local Status)) 
 227        e set_result server access_read+access_write 
 228    compile_as (expression duplicate construct_tcp_server substitute name e:1 substitute type e:0) 
 229   
 230   
 231  export '. service' 
 232  export '. start' '. stop' 
 233  export '. start_checkup' '. stop_checkup' 
 234  export tcp_server_fields secured_channel_support define_tcp_server 
 235  export tcp_servers_stop 
 236   
 237   
 238   
 239   
 240   
 241