abstract [This module is expected to help create various servers using TCP as a communication layer: it handles opening a connection socket, waiting for a client, then creating a new thread to deal with the client.]
# Copyright Hubert Tonneau hubert.tonneau@pliant.cx # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License version 2 # as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # version 2 along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# scope "/pliant/" module "/pliant/language/stream.pli" module "/pliant/language/stream/tcp.pli" module "/pliant/language/compiler.pli" module "/pliant/language/os.pli" submodule "/pliant/language/data/fields.pli"
module "/pliant/language/context.pli" module "/pliant/language/schedule/resourcesem.pli"
public gvar ResourceSem tcp_resource tcp_resource configure (max (min (cast memory_assigned\2^20 Int) 1024) 64)
constant quiet false constant sequential false constant secured_channel_support true
if secured_channel_support module "/pliant/util/crypto/channel.pli"
named_expression declare_fields field Str name <- n field Array:Str channel field Int port <- p field CBool detached <- false field DateTime since
|
field Str welcome_message
|
field Int ports_count <- 0 field Int clients_count <- 0 field CBool please_stop <- false if secured_channel_support field CBool secured <- false field Int secured_port <- undefined
meta tcp_server_fields e if e:size<>2 or (e:0 constant Str)=null or (e:1 constant Int)=null return e compile_as (expression duplicate declare_fields substitute n e:0 substitute p e:1)
public type TcpServer tcp_server_fields "" 0
public gvar List tcp_servers_list gvar Sem tcp_servers_sem gvar Sem tcp_shutdown_sem
method server record arg_rw TcpServer server entry_lock addressof:server tcp_servers_sem request server since := datetime tcp_servers_list append addressof:server tcp_servers_sem release
method server unrecord arg_rw TcpServer server tcp_servers_sem request server since := undefined var Pointer:Arrow c :> tcp_servers_list first while c<>null if c=addressof:server c :> tcp_servers_list remove c else c :> tcp_servers_list next c tcp_servers_sem release entry_unlock addressof:server
method server start_checkup -> status arg_rw TcpServer server ; arg Status status generic status := success
method server stop_checkup arg_rw TcpServer server generic
method server service stream arg_rw TcpServer server ; arg_rw Stream stream generic
method server main_loop channel oarg_rw TcpServer server ; arg Str channel tcp_resource query (var Int current) (var Int maxi) var Str options := "queue "+string:maxi part connect "receive "+server:name+" connections" while not server:please_stop var Link:Stream s :> new Stream ; entry_lock addressof:s s open channel options in+out+safe+cr+lf if s=success atomic_increment server:clients_count if sequential or not (tcp_resource nowait_request 1) part service "service "+server:name+" request from "+(s safe_query "remote_ip_address") server service s s close ; entry_unlock addressof:s atomic_add server:clients_count (-1) else safe thread part service2 "service "+server:name+" request from "+(s safe_query "remote_ip_address") share:server service s s close ; entry_unlock addressof:s atomic_add server:clients_count (-1) tcp_resource release 1 failure s close ; entry_unlock addressof:s atomic_add server:clients_count (-1) tcp_resource release 1 console "threads overflow[lf]" sleep 2 else entry_unlock addressof:s # probably a client that sent an open connection request, then vanished # not a server overflow atomic_add server:ports_count -1
method server start -> status oarg_rw TcpServer server ; arg Status status if server:since=defined return if server:channel:size=0 server channel += "tcp:/server/"+(string server:port) var Link:Stream s :> new Stream s open "tcp:/client/"+(string server:port) in+out+safe if s=success if not quiet
|
compile_message ""
|
console "TCP port "+(string server:port)+" is used by another server." eol return failure s open server:channel:0 "noautoconnect" in+out+safe if s=failure return failure s :> null map Stream else server port := undefined if secured_channel_support if server:secured if server:secured_port=undefined server:secured_port := server:port+500 server channel += "channel:/server/"+(string server:secured_port) if (server start_checkup)=failure return failure server record
|
if not quiet console server:name " server is running" (shunt server:port=defined " on TCP port "+(string server:port) "") "." if secured_channel_support if server:secured console " (secured port " server:secured_port ")" if server:detached console " (detached)" console eol
|
compile_message "" if server:welcome_message<>"" console server:welcome_message eol eif not quiet console server:name " server is running." eol
|
server:ports_count := server:channel:size for (var Int i) server:channel:size-1 (shunt server:detached 0 1) step -1 thread share:server main_loop server:channel:i if not server:detached server main_loop server:channel:0 tcp_shutdown_sem request ; tcp_shutdown_sem release status := success
method server stop timeout oarg_rw TcpServer server ; arg Float timeout var DateTime start := datetime server please_stop := true server stop_checkup for (var Int i) server:channel:size-1 0 step -1 var Str name := server:channel:i name := replace name "/server/" "/client/" name := replace name "channel:" "tcp:" (var Stream s) open name in+out+safe s close while server:ports_count<>0 and datetime:seconds-start:seconds<timeout sleep 0.1 while server:clients_count<>0 and datetime:seconds-start:seconds<timeout sleep 0.1 server please_stop := false server unrecord if not quiet
|
console server:name " server is down." eol
|
console server:name " server stopped." eol
|
function tcp_servers_stop timeout arg Float timeout var DateTime start := datetime tcp_server_sockets_off while tcp_servers_list:first<>null tcp_servers_sem request var Pointer:Arrow c :> tcp_servers_list first var Link:TcpServer server :> tcp_servers_list:first map TcpServer tcp_servers_sem release if addressof:server<>null var Float elapsed := datetime:seconds-start:seconds server stop timeout-elapsed tcp_server_sockets_on
meta define_tcp_server e if e:size<>2 or (e:0 constant Type)=null or e:1:ident="" return named_expression construct_tcp_server meta name e var Link:Argument adr :> argument local Address var Link:Argument server :> argument indirect type adr 0 e add (instruction (the_function entry_new Type -> Address) (argument mapped_constant Type type) adr) if (e parse_fields server 0 access_object)=failure return e add (instruction (the_function '. start' TcpServer -> Status) server (argument local Status)) e set_result server access_read+access_write e compile_as (expression duplicate construct_tcp_server substitute name e:1 substitute type e:0)
export '. service' export '. start' '. stop' export '. start_checkup' '. stop_checkup' export tcp_server_fields secured_channel_support define_tcp_server export tcp_servers_stop
|