Patch title: Release 85 bulk changes
Abstract:
File: /pliant/language/stream/stream.pli
Key:
    Removed line
    Added line
# Copyright  Hubert Tonneau  hubert.tonneau@pliant.cx
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 2
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# version 2 along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.

scope "/pliant/language/stream/" "/pliant/language/" "/pliant/protocol/dns/"
module "ring.pli"
module "/pliant/language/type/set/list.pli"

#----------------------------------------------------------------------
# Stream 


type StreamDriver
  void


type Stream
  field Address read_cur read_stop
  field Address write_cur write_stop

  field Address read_buf
  field Int read_buf_size
  field Address write_buf
  field Int write_buf_size

  field Int flags
  field Link:StreamDriver driver
  field Str name
  field ListNode_ list
  field Int handle

  field Int line_number
  field Int line_limit
  field Str next_line


(addressof:Stream map Type) flags := Stream:flags .or. type_flag_do_not_copy


#----------------------------------------------------------------------


method fs open name options flags stream support -> status
  oarg_rw FileSystem fs ; arg Str name options ; arg Int flags ; arg_rw Stream stream support ; arg ExtendedStatus status
  generic
  status := failure

method drv read buf mini maxi -> red
  oarg_rw StreamDriver drv ; arg Address buf ; arg Int mini maxi red
  generic
  red := 0

method drv write buf mini maxi -> written
  oarg_rw StreamDriver drv ; arg Address buf ; arg Int mini maxi written
  generic
  written := 0

method drv flush level -> status
  oarg_rw StreamDriver drv ; arg Int level ; arg Status status
  generic
  status := success

method drv close -> status
  oarg_rw StreamDriver drv ; arg ExtendedStatus status
  generic
  status := success

method drv query command stream answer -> status
  oarg_rw StreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg_w Str answer ; arg ExtendedStatus status 
  generic
  answer := ""
  status := failure

method drv configure command stream -> status
  oarg_rw StreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg ExtendedStatus status
  generic
  status := failure


export '. open'
export StreamDriver '. read' '. write' '. flush'
export Stream '. line_number' '. line_limit'


#----------------------------------------------------------------------
# other constants and global variables


constant crashed              01000000h
constant unflushed            02000000h
constant next_line_available  04000000h


#----------------------------------------------------------------------
# stream operations


method s is_open -> o
  arg Stream s ; arg CBool o
  o := (s:flags .and. in+out)<>0
  check (shunt o (addressof s:driver)<>null (addressof s:driver)=null)


method s is_crashed -> c
  arg Stream s ; arg CBool c
  c := (s:flags .and. crashed)<>0

method s recover
  arg_rw Stream s
  s:flags := s:flags .and. .not. (cast crashed Int)


method s reset
  arg_rw Stream s
  s read_cur := null ; s read_stop := null
  s write_cur := null ; s write_stop := null
  memory_free s:read_buf ; s read_buf := null
  memory_free s:write_buf ; s write_buf := null
  s flags := s:flags .and. crashed
  s handle := undefined
  s driver :> null map StreamDriver
  s line_number := 0
  s line_limit := 2^16


method s error msg
  arg_rw Stream s ; arg Str msg
  s flags := s:flags .or. crashed
  s read_cur := s read_stop
  s write_cur := s write_stop
  if (s:flags .and. safe)=0
    error error_id_io msg+" ("+s:name+")"


method s write_all_data address size
  arg_rw Stream s ; arg Address address ; arg Int size
  var Address adr := address
  var Int remain := size
  while remain>0
    var Int written := s:driver write adr remain remain
    check written>=0
    if written<=0
      s error "Failed to write to stream"
      return
    else
      s flags := s:flags .or. unflushed
      adr := adr translate Byte written
      remain := remain-written

method s flush level
  arg_rw Stream s ; arg Int level
  if (s:flags .and. out)=0
    s error "Attempted to flush an "+(shunt s:is_open "in" "unopened")+" stream"
  if s:is_crashed
    return
  if s:write_cur<>s:write_buf
    s write_all_data s:write_buf (cast s:write_cur Int).-.(cast s:write_buf Int)
    s write_cur := s write_buf
  if (s:driver flush level)=failure
    s error "Failed to flush stream ("+string:level+")"
  s flags := s:flags .andnot. unflushed
(the_function '. flush' Stream Int) extra_module :> the_module "/pliant/language/stream/flushmode.pli"


method s query command -> answer
  arg_rw Stream s ; arg Str command answer
  if s:is_open
    if (s:driver query command s answer)=failure
      answer := ""
      s error "Failed to query the stream (the query was "+command+")"
  else
    s error "Attempted to query an unopened stream"
    answer := ""

method s safe_query command -> answer
  arg_rw Stream s ; arg Str command answer
  if s:is_open
    if (s:driver query command s answer)=failure
      answer := ""
  else
    answer := ""

method s configure command -> status
  arg_rw Stream s ; arg Str command ; arg ExtendedStatus status
  if s:is_open
    status := s:driver configure command s
    if status=failure
      s error "Failed to configure the stream (the command was "+command+")"
  else
    s error "Attempted to configure an unopened stream"
    status := failure

method s safe_configure command -> status
  arg_rw Stream s ; arg Str command ; arg ExtendedStatus status
  if s:is_open
    status := s:driver configure command s
  else
    status := failure "Not an open stream"


method s raw_read address size
  arg_rw Stream s ; arg Address address ; arg Int size
  check size>=0
  if (s:read_cur translate Byte size)<=s:read_stop
    memory_copy s:read_cur address size
    s read_cur := s:read_cur translate Byte size
  else
    if (s:flags .and. in)=0
      s error "Attempted to read from an "+(shunt s:is_open "out" "unopened")+" stream"
    if (s:flags .and. noautopost)=0 and (s:write_cur<>s:write_buf or (s:flags .and. unflushed)<>0)
      s flush async
    var Address adr := address
    var Int remain := size
    while remain>0
      if s:is_crashed
        memory_clear adr remain
        return
      eif s:read_cur<>s:read_stop
        var Int step := min (cast s:read_stop Int).-.(cast s:read_cur Int) remain
        memory_copy s:read_cur adr step
        s read_cur := s:read_cur translate Byte step ; adr := adr translate Byte step ; remain := remain-step
      eif remain<s:read_buf_size\2
        var Int red := s:driver read s:read_buf remain s:read_buf_size
        check red>=0
        if red<=0
          s error "Failed to read from stream"
        s read_cur := s read_buf ; s read_stop := s:read_buf translate Byte red
      else
        var Int red := s:driver read adr remain remain
        check red>=0
        if red<=0
          s error "Failed to read from stream"
        adr := adr translate Byte red ; remain := remain-red


method s raw_write address size
  arg_rw Stream s ; arg Address address ; arg Int size
  check size>=0
  if (s:write_cur translate Byte size)<=s:write_stop
    memory_copy address s:write_cur size
    s write_cur := s:write_cur translate Byte size
  else
    if (s:flags .and. out)=0
      s error "Attempted to write to an "+(shunt s:is_open "in" "unopened")+" stream"
    if s:is_crashed
      return
    var Int more := min (cast s:write_stop Int).-.(cast s:write_cur Int) size
    check more>=0
    memory_copy address s:write_cur more
    s write_cur := s:write_cur translate Byte more
    s write_all_data s:write_buf (cast s:write_cur Int).-.(cast s:write_buf Int)
    if (s:flags .and. crashed)<>0
      return
    s write_cur := s write_buf
    var Address adr := address translate Byte more
    var Int remain := size-more
    check remain>=0
    while remain>s:write_buf_size\2
      var Int written := s:driver write adr remain-s:write_buf_size\2 remain
      check written>=0
      if written<=0
        s error "Failed to write to stream"
        return
      else
        s flags := s:flags .or. unflushed
        adr := adr translate Byte written ; remain := remain-written
    check remain>=0
    memory_copy adr s:write_cur remain
    s write_cur := s:write_cur translate Byte remain


method s close -> status
  arg_rw Stream s ; arg ExtendedStatus status
  if s:is_open
    if (s:flags .and. out)<>0
      s flush end
    status := s:driver close
    if status=failure
      s error "Failed to close stream"
    s reset
  else
    status := failure


method s open name options flags fs support -> status
  arg_rw Stream s ; arg Str name options ; arg Int flags ; oarg_rw FileSystem fs ; arg_rw Stream support ; arg ExtendedStatus status
  s close
  s name := name
  s flags := flags .or. (shunt (flags .and. append)=append out 0)
  var Int cs := shunt (flags .and. nocache)<>0 0 (flags .and. linecache)<>0 256 (flags .and. bigcache)<>0 65536 4096
  s read_buf_size := cs
  s write_buf_size := cs
  status := fs open name options s:flags s support
  if status=failure and (s:flags .and. out+mkdir)=out+mkdir
    if not (name:len>0 and name:0="[dq]" and (name parse (var Str base) any:(var Str opt)))
      base := name ; opt := ""    
    var Int i := base:len
    while { i := (base 0 i) search_last "/" 0 ; i<>0 and (fs configure string:(base 0 i+1) opt+(shunt opt:len<>0 and options:len<>0 " " "")+options "mkdir")=failure }
      void
    while { i := i+1 ; i := ((base i base:len) search "/" -i)+i ; i<>0 }
      fs configure string:(base 0 i+1) opt+(shunt opt:len<>0 and options:len<>0 " " "")+options "mkdir"
    status := fs open name options s:flags s support
    if status=failure
      i := 0
      while { i := i+1 ; i := ((base i base:len) search "/" -i)+i ; i<>0 }
        fs configure (base 0 i+1) opt+(shunt opt:len<>0 and options:len<>0 " " "")+options "mkdir"
      status := fs open name options s:flags s support
  if status=success
    check (addressof s:driver)<>null
    if (s:flags .and. in)<>0
      s read_buf := memory_allocate s:read_buf_size addressof:s
      s read_cur := s read_buf
      s read_stop := s read_buf
    else
      s read_buf_size := 0
    if (s:flags .and. out)<>0
      s write_buf := memory_allocate s:write_buf_size addressof:s
      s write_cur := s write_buf
      s write_stop := s:write_buf translate Byte s:write_buf_size
    else
      s write_buf_size := 0
  else
    check (addressof s:driver)=null
    s flags := crashed+(flags .and. safe)
(the_function '. open' Stream Str Str Int FileSystem Stream -> ExtendedStatus) extra_module :> the_module "/pliant/language/stream/openmode.pli"

method s open name flags -> status
  arg_rw Stream s ; arg Str name ; arg Int flags ; arg ExtendedStatus status
  status := s open name "" flags pliant_default_file_system (null map Stream)
(the_function '. open' Stream Str Int -> ExtendedStatus) extra_module :> the_module "/pliant/language/stream/openmode.pli"
  
method s open name options flags-> status
  arg_rw Stream s ; arg Str name options ; arg Int flags ; arg ExtendedStatus status
  status := s open name options flags pliant_default_file_system (null map Stream)
(the_function '. open' Stream Str Str Int -> ExtendedStatus) extra_module :> the_module "/pliant/language/stream/openmode.pli"
  

method s atend -> ae
  arg_rw Stream s ; arg CBool ae
  if s:read_cur<>s:read_stop
    return false
  if (s:flags .and. in)=0
    s error "Attempted to test read end on an "+(shunt s:is_open "out" "unopened")+" stream"
  if (s:flags .and. crashed)<>0
    return true
  eif (s:flags .and. next_line_available)<>0
    return false
  else
    if (s:flags .and. noautopost)=0 and (s:write_cur<>s:write_buf or (s:flags .and. unflushed)<>0)
      s flush async
      if (s:flags .and. crashed)<>0
        return true
    var Int red := s:driver read s:read_buf 1 s:read_buf_size
    check red>=0
    s read_cur := s read_buf
    s read_stop := s:read_buf translate Byte red
    return red<=0


method s read_available address size
  arg_rw Stream s ; arg_w Address address ; arg_w Int size
  if not s:atend
    address := s read_cur ; size := (cast s:read_stop Int).-.(cast s:read_cur Int)
    s read_cur := s read_stop
  else
    address := null ; size := 0

method s read_available address size maxi
  arg_rw Stream s ; arg_w Address address ; arg_w Int size ; arg Int maxi
  if not s:atend
    address := s read_cur ; size := min (cast s:read_stop Int).-.(cast s:read_cur Int) maxi
    s read_cur := address translate Byte size
  else
    address := null ; size := 0


method s writechars chars
  arg_rw Stream s ; arg Str chars
  s raw_write chars:characters chars:len


method s readline -> l
  arg_rw Stream s ; arg Str l
  if (s:flags .and. next_line_available)<>0
    l := s next_line
    s next_line := ""
    s flags := s:flags-next_line_available
    return
  l := ""
  if s:atend
    return
  s line_number := s:line_number+1
  while true
    if s:atend
      return
    var Int mode := s:flags .and. cr+lf
    var Address eol := memory_search s:read_cur (cast s:read_stop Int).-.(cast s:read_cur Int) (shunt mode<>cr "[lf]" "[cr]"):characters Char:size
    if  mode=0
      var Address eol_cr := memory_search s:read_cur (cast s:read_stop Int).-.(cast s:read_cur Int) "[cr]":characters Char:size
      var Address eol_lf := eol
      eol :=  shunt eol_cr<>null and (eol_lf=null or (cast eol_cr Int).-.(cast s:read_cur Int)<(cast eol_lf Int).-.(cast s:read_cur Int)) eol_cr eol_lf
    var Int extra := (cast (shunt eol<>null eol s:read_stop) Int) .-. (cast s:read_cur Int)
    check extra>=0
    if l:len+extra>s:line_limit
      s error "too long line"
      l := ""
      return
    l resize l:len+extra
    memory_copy s:read_cur (l:characters translate Char l:len-extra) extra
    if eol<>null
      s read_cur := eol translate Char 1
      if mode=cr+lf and l:len>0 and (l l:len-1)="[cr]"
        l resize l:len-1
      if mode=0 
        if eol=eol_cr and not s:atend and (s:read_cur map Char)="[lf]"
          s read_cur := s:read_cur translate Char 1
          mode := cr+lf
        eif eol=eol_lf
          mode := lf
        else
          check eol=eol_cr
          mode := cr
        if (s:flags .and. anyeol)=0
          s:flags := s:flags .or. mode
      return
    else
      s read_cur := s read_stop


method s unreadline l
  arg_rw Stream s ; arg Str l
  s next_line := l
  s flags := s:flags .or. next_line_available


method s eol
  arg_rw Stream s
  var Int mode := s:flags .and. cr+lf
  var Str eol := shunt mode=0 or mode=lf "[lf]" mode=cr "[cr]" "[cr][lf]"
  s raw_write eol:characters eol:len
  if (s:flags .and. linecache)<>0
    s flush async


method s writeline l
  arg_rw Stream s ; arg Str l
  s writechars l
  s eol


function raw_copy src dest mini maxi -> copyed
  arg_rw Stream src dest ; arg Int mini maxi copyed
  # should use the 2.2 sendfile function when the Linux kernel is >=2.2 and both streams have a handle
  copyed := 0
  while copyed<mini
    src read_available (var Address adr) (var Int size) maxi-copyed
    if size=0
      return
    dest raw_write adr size
    copyed += size


function build s
  arg_w Stream s
  s read_buf := null
  s write_buf := null
  s flags := 0
  s reset
  s:list next :> s list

function destroy s
  arg_w Stream s
  s close

export '. open' '. close' '. raw_read' '. raw_write' '. flush' '. query' '. configure' '. safe_query' '. safe_configure'
export '. writechars' '. readline' '. unreadline' '. eol' '. writeline'
export '. atend' '. read_available' '. error'
export raw_copy


#----------------------------------------------------------------------


function 'cast Status' s -> stat
  arg Stream s ; arg Status stat
  explicit
  stat := shunt (s:flags .and. crashed)=0 success failure
  stat := shunt s:is_open and not s:is_crashed success failure

export 'cast Status'


alias '. stream_handle' '. handle'
alias '. stream_flags' '. flags'
alias '. stream_driver' '. driver'
alias '. stream_read_buf' '. read_buf'
alias '. stream_read_buf_size' '. read_buf_size'
alias '. stream_read_cur' '. read_cur'
alias '. stream_read_stop' '. read_stop'
alias '. stream_write_buf' '. write_buf'
alias '. stream_write_buf_size' '. write_buf_size'
alias '. stream_write_cur' '. write_cur'
alias '. stream_write_stop' '. write_stop'
export '. stream_handle' '. stream_flags' '. stream_driver'
export '. stream_read_buf' '. stream_read_buf_size' '. stream_read_cur' '. stream_read_stop'
export '. stream_write_buf' '. stream_write_buf_size' '. stream_write_cur' '. stream_write_stop'

alias stream_flag_unflushed unflushed
alias stream_flag_crashed crashed
export stream_flag_unflushed stream_flag_crashed

export '. name' '. is_open' '. is_crashed' '. recover'