Patch title: Release 94 bulk changes
Abstract:
File: /pliant/util/pml/multiplexer.pli
Key:
    Removed line
    Added line
module "/pliant/language/compiler.pli"
module "/pliant/language/context.pli"
module "/pliant/protocol/dns/name.pli"
module "/pliant/language/stream.pli"
module "/pliant/util/pml/io.pli"
module "/pliant/util/pml/channel.pli"
module "/pliant/language/stream/filesystembase.pli"

# recommanded TCP ports are 36 and 8036


constant trace true
constant trace2 true


gvar (Dictionary Str Link:Function) services
gvar List:Str server_ports client_ports
gvar Sem sem
gvar Int counter := 0
gvar CBool shutdown
gvar DateTime multiplexer_running_since := undefined


function service_prototype stream fun
  arg_rw Stream stream ; arg Function fun
  indirect


function proxy s d
  arg_rw Link:Stream s d
  thread
    while { d read_available (var Address adr2) (var Int size2) ; size2<>0 }
      s raw_write adr2 size2
      s flush anytime
    s safe_configure "shutdown"
  while { s read_available (var Address adr1) (var Int size1) ; size1<>0 }
    d raw_write adr1 size1
    d flush anytime
  d safe_configure "shutdown"


function multiplexer_service name fun
  arg Str name ; arg Function fun
  sem request
  var Link:Function f :> fun
  services insert name f
  sem release


function multiplexer_port server_port client_port
  arg Str server_port client_port
  sem request
  each p server_ports
    if p=server_port
      sem release
      return
  server_ports += server_port
  client_ports += client_port
  sem release
  shutdown := false
  thread
    part mx "multiplexer port handler"
      var CBool continue := true
      while continue and not shutdown
        var Link:Stream s :> new Stream
        s open server_port in+out+safe
        if s=success
          atomic_increment counter
          safe
            thread
              part "pml request from "+(s safe_query "remote_ip_address")
                s rewind_open
                if (s itag "pliant") and (s iattr "service" (var Str service))
                  var CBool secured := s iattr "secured"
                  part service "service "+service+(shunt secured " secured" "")+" request"
                    s rewind
                    s rewind_close
                    sem rd_request
                    var (Pointer Link:Function) pfun :>> services first service
                    var Link:Function fun
                    if exists:pfun
                      fun :> pfun
                    else
                      fun :> null map Function
                    sem rd_release
                    if exists:fun
                      if secured
                        var Link:Stream s2 :> new Stream
                        var ExtendedStatus status := s2 open "channel2:/server/0" "" in+out+safe pliant_default_file_system s
                        s :> s2 ; s2 :> null map Stream
                        if status=success
                          service_prototype s fun
                        else
                          if trace2
                            console "failed to setup secured connection: "+status:message eol
                      else
                        service_prototype s fun
                    else
                      if trace2
                        console "there is no '"+service+"' service[lf]"
                else
                  if trace2
                    console "invalid stream[lf]"
                  s rewind_close
              s close
              atomic_add counter (-1)
          failure
            s close
            atomic_add counter (-1)
            if trace
              console "threads overflow[lf]"
            sleep 2
        else
          if trace
            console "failed to listen for clients ("+server_port+")[lf]"
          continue := false
      sem request
      var Pointer:Str p :> server_ports first
      while exists:p and p<>server_port
        p :> server_ports next p
      if exists:p
        server_ports remove p
      var Pointer:Str p :> client_ports first
      while exists:p and p<>client_port
        p :> client_ports next p
      if exists:p
        client_ports remove p
      sem release
  if multiplexer_running_since=undefined
    multiplexer_running_since := datetime


function multiplexer_server -> port
  arg Int port
  port := name_database:data:host:computer_fullname:port
  if port<>undefined
    multiplexer_port "tcp:/server/"+string:port "tcp://127.0.0.1/client/"+string:port

function multiplexer_shutdown timeout
  arg Float timeout
  shutdown := true
  var DateTime start := datetime
  sem rd_request
  each p client_ports
    (var Stream s) open p in+out+safe
    s close
  sem rd_release
  while counter<>0 and datetime:seconds-start:seconds<timeout
    sleep 0.1


export multiplexer_service multiplexer_port multiplexer_server multiplexer_shutdown multiplexer_running_since