Patch title: Release 94 bulk changes
Abstract:
File: /pliant/language/stream/stream.pli
Key:
    Removed line
    Added line
# Copyright  Hubert Tonneau  hubert.tonneau@pliant.cx
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 2
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# version 2 along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.

scope "/pliant/language/stream/" "/pliant/language/" "/pliant/protocol/dns/"
module "ring.pli"
module "/pliant/language/type/set/list.pli"

constant has_unreadline true
constant has_rewind true


#----------------------------------------------------------------------
# Stream 


type StreamDriver
  void

if has_rewind

  type StreamBuffer
    field Address start stop

  type StreamMark
    field Pointer:StreamBuffer buffer
    field Int offset   


type Stream
  field Address read_cur read_stop
  field Address write_cur write_stop

  field Address read_buf
  field Int read_buf_size
  field Address write_buf
  field Int write_buf_size

  field Int flags
  field Link:StreamDriver driver
  field Str name
  field ListNode_ list
  field Int handle

  field Int line_number
  field Int line_limit
  field Str next_line
  if has_unreadline
    field Str next_line
  
  if has_rewind
    field List:StreamBuffer rewind_buffers
    field Pointer:StreamBuffer rewind_current
    field Address rewind_stop # backup of read_stop
    field Array:StreamMark rewind_stack ; field Int rewind_count
    field Int rewind_size rewind_limit


(addressof:Stream map Type) flags := Stream:flags .or. type_flag_do_not_copy


#----------------------------------------------------------------------


method fs open name options flags stream support -> status
  oarg_rw FileSystem fs ; arg Str name options ; arg Int flags ; arg_rw Stream stream support ; arg ExtendedStatus status
  generic
  status := failure

method drv read buf mini maxi -> red
  oarg_rw StreamDriver drv ; arg Address buf ; arg Int mini maxi red
  generic
  red := 0

method drv write buf mini maxi -> written
  oarg_rw StreamDriver drv ; arg Address buf ; arg Int mini maxi written
  generic
  written := 0

method drv flush level -> status
  oarg_rw StreamDriver drv ; arg Int level ; arg Status status
  generic
  status := success

method drv close -> status
  oarg_rw StreamDriver drv ; arg ExtendedStatus status
  generic
  status := success

method drv query command stream answer -> status
  oarg_rw StreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg_w Str answer ; arg ExtendedStatus status 
  generic
  answer := ""
  status := failure

method drv configure command stream -> status
  oarg_rw StreamDriver drv ; arg Str command ; arg_rw Stream stream ; arg ExtendedStatus status
  generic
  status := failure


export '. open'
export StreamDriver '. read' '. write' '. flush'
export Stream '. line_number' '. line_limit'


#----------------------------------------------------------------------
# other constants and global variables


constant crashed              01000000h
constant unflushed            02000000h
constant next_line_available  04000000h
constant crashed                01000000h
constant unflushed              02000000h
if has_unreadline
  constant next_line_available  04000000h
if has_rewind
  constant rewind_is_active    08000000h


#----------------------------------------------------------------------
# stream operations


method s is_open -> o
  arg Stream s ; arg CBool o
  o := (s:flags .and. in+out)<>0
  check (shunt o (addressof s:driver)<>null (addressof s:driver)=null)


method s is_crashed -> c
  arg Stream s ; arg CBool c
  c := (s:flags .and. crashed)<>0

method s recover
  arg_rw Stream s
  s:flags := s:flags .and. .not. (cast crashed Int)


method s error msg
  arg_rw Stream s ; arg Str msg
  s flags := s:flags .or. crashed
  s read_cur := s read_stop
  s write_cur := s write_stop
  if (s:flags .and. safe)=0
    error error_id_io msg+" ("+s:name+")"


if has_rewind

  method s rewind_read_required -> required
    arg_rw Stream s ; arg CBool required
    if (s:flags .and. rewind_is_active)=0
      return true
    var Pointer:StreamBuffer b1 :> s rewind_current
    if exists:b1
      var Pointer:StreamBuffer b2 :> s:rewind_buffers next s:rewind_current
      s rewind_current :> b2
      if exists:b2
        s read_cur := b2 start
        s read_stop := b2 stop
      else
        s read_cur := s read_buf
        s read_stop := s rewind_stop
      if s:rewind_count=0
        memory_free b1:start
        s rewind_size -= (cast b1:stop Int) .-. (cast b1:start Int)
        s:rewind_buffers remove b1
        if not exists:b2
          s flags := s:flags .and. .not. (cast rewind_is_active Int)
      required := s:read_cur=s:read_stop      
    else
      var Int size := (cast s:read_stop Int) .-. (cast s:read_buf Int)
      var Int skip := size
      for (var Int i) 0 s:rewind_count-1
        var Pointer:StreamMark m :> s:rewind_stack i
        skip := shunt (exists m:buffer) 0 (min skip m:offset)
      size -= skip
      while s:rewind_size+size>s:rewind_limit and s:rewind_size>0
        var Pointer:StreamBuffer b :> s:rewind_buffers first
        for (var Int i) 0 s:rewind_count-1
          var Pointer:StreamMark m :> s:rewind_stack i
          if (addressof m:buffer)=addressof:b
            m offset := undefined
        memory_free b:start
        s rewind_size -= (cast b:stop Int) .-. (cast b:start Int)
        s:rewind_buffers remove b
      if size>0
        s rewind_buffers += var StreamBuffer empty_buffer
        var Pointer:StreamBuffer b :> s:rewind_buffers last
        b start := memory_allocate size addressof:s
        memory_copy (s:read_buf translate Byte skip) b:start size
        b stop := b:start translate Byte size
        s rewind_size += size
        for (var Int i) 0 s:rewind_count-1
          var Pointer:StreamMark m :> s:rewind_stack i
          if not (exists m:buffer)
            m buffer :> b
            m offset -= skip
      else
        for (var Int i) 0 s:rewind_count-1
          var Pointer:StreamMark m :> s:rewind_stack i
          if not (exists m:buffer)
            m offset := 0
      required := true

  method s rewind_open
    arg_rw Stream s
    s flags := s:flags .or. rewind_is_active
    var Int count := s rewind_count
    s rewind_count := count+1
    if count>=s:rewind_stack:size
      s:rewind_stack size := count+1
    var Pointer:StreamMark m :> s:rewind_stack count
    m buffer :> s rewind_current
    var Address start
    if (exists s:rewind_current)
      start := s:rewind_current start
    else
      start := s read_buf
    m offset := (cast s:read_cur Int) .-. (cast start Int)

  method s rewind
    arg_rw Stream s
    check s:rewind_count>0
    var Pointer:StreamMark m :> s:rewind_stack s:rewind_count-1
    if m:offset=undefined
      s error "rewind capacity overflow"
      return
    var Pointer:StreamBuffer b :> m buffer
    if not (exists s:rewind_current)
      s rewind_stop := s read_stop
    s rewind_current :> b
    if exists:b
      s read_cur := b:start translate Byte m:offset
      s read_stop := b stop
    else
      s read_cur := s:read_buf translate Byte m:offset
    
  method s rewind_close
    arg_rw Stream s
    var Int count := s rewind_count
    check count>0
    count -= 1
    s rewind_count := count
    if count=0
      while { var Pointer:StreamBuffer b :> s:rewind_buffers first ; exists:b and addressof:b<>(addressof s:rewind_current) }
        memory_free b:start
        s rewind_size -= (cast b:stop Int) .-. (cast b:start Int)
        s:rewind_buffers remove b
      if not exists:b
        s flags := s:flags .and. .not. (cast rewind_is_active Int)


method s reset
  arg_rw Stream s
  s read_cur := null ; s read_stop := null
  s write_cur := null ; s write_stop := null
  memory_free s:read_buf ; s read_buf := null
  memory_free s:write_buf ; s write_buf := null
  s flags := s:flags .and. crashed
  s handle := undefined
  s driver :> null map StreamDriver
  s line_number := 0
  s line_limit := 2^16
  if has_rewind
    each b s:rewind_buffers
      memory_free b:start
    s rewind_buffers := var List:StreamBuffer empty_buffers_list
    s rewind_current :> null map StreamBuffer
    s:rewind_stack size := 0 ; s rewind_count := 0
    s rewind_size := 0
    s rewind_limit := 2^24


method s error msg
  arg_rw Stream s ; arg Str msg
  s flags := s:flags .or. crashed
  s read_cur := s read_stop
  s write_cur := s write_stop
  if (s:flags .and. safe)=0
    error error_id_io msg+" ("+s:name+")"


method s write_all_data address size
  arg_rw Stream s ; arg Address address ; arg Int size
  var Address adr := address
  var Int remain := size
  while remain>0
    var Int written := s:driver write adr remain remain
    check written>=0
    if written<=0
      s error "Failed to write to stream"
      return
    else
      s flags := s:flags .or. unflushed
      adr := adr translate Byte written
      remain := remain-written

method s flush level
  arg_rw Stream s ; arg Int level
  if (s:flags .and. out)=0
    s error "Attempted to flush an "+(shunt s:is_open "in" "unopened")+" stream"
  if s:is_crashed
    return
  if s:write_cur<>s:write_buf
    s write_all_data s:write_buf (cast s:write_cur Int).-.(cast s:write_buf Int)
    s write_cur := s write_buf
  if (s:driver flush level)=failure
    s error "Failed to flush stream ("+string:level+")"
  s flags := s:flags .andnot. unflushed
(the_function '. flush' Stream Int) extra_module :> the_module "/pliant/language/stream/flushmode.pli"


method s query command -> answer
  arg_rw Stream s ; arg Str command answer
  if s:is_open
    if (s:driver query command s answer)=failure
      answer := ""
      s error "Failed to query the stream (the query was "+command+")"
  else
    s error "Attempted to query an unopened stream"
    answer := ""

method s safe_query command -> answer
  arg_rw Stream s ; arg Str command answer
  if s:is_open
    if (s:driver query command s answer)=failure
      answer := ""
  else
    answer := ""

method s configure command -> status
  arg_rw Stream s ; arg Str command ; arg ExtendedStatus status
  if s:is_open
    status := s:driver configure command s
    if status=failure
      s error "Failed to configure the stream (the command was "+command+")"
  else
    s error "Attempted to configure an unopened stream"
    status := failure

method s safe_configure command -> status
  arg_rw Stream s ; arg Str command ; arg ExtendedStatus status
  if s:is_open
    status := s:driver configure command s
  else
    status := failure "Not an open stream"


method s raw_read address size
  arg_rw Stream s ; arg Address address ; arg Int size
  check size>=0
  if (s:read_cur translate Byte size)<=s:read_stop
    memory_copy s:read_cur address size
    s read_cur := s:read_cur translate Byte size
  else
    if (s:flags .and. in)=0
      s error "Attempted to read from an "+(shunt s:is_open "out" "unopened")+" stream"
    if (s:flags .and. noautopost)=0 and (s:write_cur<>s:write_buf or (s:flags .and. unflushed)<>0)
      s flush async
    var Address adr := address
    var Int remain := size
    while remain>0
      if s:is_crashed
        memory_clear adr remain
        return
      eif s:read_cur<>s:read_stop
        var Int step := min (cast s:read_stop Int).-.(cast s:read_cur Int) remain
        memory_copy s:read_cur adr step
        s read_cur := s:read_cur translate Byte step ; adr := adr translate Byte step ; remain := remain-step
      eif remain<s:read_buf_size\2
        var Int red := s:driver read s:read_buf remain s:read_buf_size
        check red>=0
        if red<=0
          s error "Failed to read from stream"
        s read_cur := s read_buf ; s read_stop := s:read_buf translate Byte red
      eif remain<s:read_buf_size\2 or has_rewind and (s:flags .and. rewind_is_active)<>0
        if not has_rewind or s:rewind_read_required
          var Int red := s:driver read s:read_buf remain s:read_buf_size
          check red>=0
          if red<=0
            s error "Failed to read from stream"
          s read_cur := s read_buf ; s read_stop := s:read_buf translate Byte red
      else
        var Int red := s:driver read adr remain remain
        check red>=0
        if red<=0
          s error "Failed to read from stream"
        adr := adr translate Byte red ; remain := remain-red


method s raw_write address size
  arg_rw Stream s ; arg Address address ; arg Int size
  check size>=0
  if (s:write_cur translate Byte size)<=s:write_stop
    memory_copy address s:write_cur size
    s write_cur := s:write_cur translate Byte size
  else
    if (s:flags .and. out)=0
      s error "Attempted to write to an "+(shunt s:is_open "in" "unopened")+" stream"
    if s:is_crashed
      return
    var Int more := min (cast s:write_stop Int).-.(cast s:write_cur Int) size
    check more>=0
    memory_copy address s:write_cur more
    s write_cur := s:write_cur translate Byte more
    s write_all_data s:write_buf (cast s:write_cur Int).-.(cast s:write_buf Int)
    if (s:flags .and. crashed)<>0
      return
    s write_cur := s write_buf
    var Address adr := address translate Byte more
    var Int remain := size-more
    check remain>=0
    while remain>s:write_buf_size\2
      var Int written := s:driver write adr remain-s:write_buf_size\2 remain
      check written>=0
      if written<=0
        s error "Failed to write to stream"
        return
      else
        s flags := s:flags .or. unflushed
        adr := adr translate Byte written ; remain := remain-written
    check remain>=0
    memory_copy adr s:write_cur remain
    s write_cur := s:write_cur translate Byte remain


method s close -> status
  arg_rw Stream s ; arg ExtendedStatus status
  if s:is_open
    if (s:flags .and. out)<>0
      s flush end
    status := s:driver close
    if status=failure
      s error "Failed to close stream"
    s reset
  else
    status := failure


method s open name options flags fs support -> status
  arg_rw Stream s ; arg Str name options ; arg Int flags ; oarg_rw FileSystem fs ; arg_rw Stream support ; arg ExtendedStatus status
  s close
  s name := name
  s flags := flags .or. (shunt (flags .and. append)=append out 0)
  var Int cs := shunt (flags .and. nocache)<>0 0 (flags .and. linecache)<>0 256 (flags .and. bigcache)<>0 65536 4096
  var Int cs := shunt (flags .and. nocache)<>0 0 (flags .and. linecache)<>0 2^8 (flags .and. bigcache)<>0 2^16 2^12
  s read_buf_size := cs
  s write_buf_size := cs
  status := fs open name options s:flags s support
  if status=failure and (s:flags .and. out+mkdir)=out+mkdir
    if not (name:len>0 and name:0="[dq]" and (name parse (var Str base) any:(var Str opt)))
      base := name ; opt := ""    
    var Int i := base:len
    while { i := (base 0 i) search_last "/" 0 ; i<>0 and (fs configure string:(base 0 i+1) opt+(shunt opt:len<>0 and options:len<>0 " " "")+options "mkdir")=failure }
      void
    while { i := i+1 ; i := ((base i base:len) search "/" -i)+i ; i<>0 }
      fs configure string:(base 0 i+1) opt+(shunt opt:len<>0 and options:len<>0 " " "")+options "mkdir"
    status := fs open name options s:flags s support
    if status=failure
      i := 0
      while { i := i+1 ; i := ((base i base:len) search "/" -i)+i ; i<>0 }
        fs configure (base 0 i+1) opt+(shunt opt:len<>0 and options:len<>0 " " "")+options "mkdir"
      status := fs open name options s:flags s support
  if status=success
    check (addressof s:driver)<>null
    if (s:flags .and. in)<>0
      s read_buf := memory_allocate s:read_buf_size addressof:s
      s read_cur := s read_buf
      s read_stop := s read_buf
    else
      s read_buf_size := 0
    if (s:flags .and. out)<>0
      s write_buf := memory_allocate s:write_buf_size addressof:s
      s write_cur := s write_buf
      s write_stop := s:write_buf translate Byte s:write_buf_size
    else
      s write_buf_size := 0
  else
    check (addressof s:driver)=null
    s flags := crashed+(flags .and. safe)
(the_function '. open' Stream Str Str Int FileSystem Stream -> ExtendedStatus) extra_module :> the_module "/pliant/language/stream/openmode.pli"

method s open name flags -> status
  arg_rw Stream s ; arg Str name ; arg Int flags ; arg ExtendedStatus status
  status := s open name "" flags pliant_default_file_system (null map Stream)
(the_function '. open' Stream Str Int -> ExtendedStatus) extra_module :> the_module "/pliant/language/stream/openmode.pli"
  
method s open name options flags-> status
  arg_rw Stream s ; arg Str name options ; arg Int flags ; arg ExtendedStatus status
  status := s open name options flags pliant_default_file_system (null map Stream)
(the_function '. open' Stream Str Str Int -> ExtendedStatus) extra_module :> the_module "/pliant/language/stream/openmode.pli"
  

method s atend -> ae
  arg_rw Stream s ; arg CBool ae
  if s:read_cur<>s:read_stop
    return false
  if (s:flags .and. in)=0
    s error "Attempted to test read end on an "+(shunt s:is_open "out" "unopened")+" stream"
  if (s:flags .and. crashed)<>0
    return true
  eif (s:flags .and. next_line_available)<>0
  eif has_unreadline and (s:flags .and. next_line_available)<>0
    return false
  else
    if (s:flags .and. noautopost)=0 and (s:write_cur<>s:write_buf or (s:flags .and. unflushed)<>0)
      s flush async
      if (s:flags .and. crashed)<>0
        return true
    var Int red := s:driver read s:read_buf 1 s:read_buf_size
    check red>=0
    s read_cur := s read_buf
    s read_stop := s:read_buf translate Byte red
    return red<=0
    if not has_rewind or s:rewind_read_required
      var Int red := s:driver read s:read_buf 1 s:read_buf_size
      check red>=0
      s read_cur := s read_buf
      s read_stop := s:read_buf translate Byte red
      return red<=0
    else
      return false


method s read_available address size
  arg_rw Stream s ; arg_w Address address ; arg_w Int size
  if not s:atend
    address := s read_cur ; size := (cast s:read_stop Int).-.(cast s:read_cur Int)
    s read_cur := s read_stop
  else
    address := null ; size := 0

method s read_available address size maxi
  arg_rw Stream s ; arg_w Address address ; arg_w Int size ; arg Int maxi
  if not s:atend
    address := s read_cur ; size := min (cast s:read_stop Int).-.(cast s:read_cur Int) maxi
    s read_cur := address translate Byte size
  else
    address := null ; size := 0


method s writechars chars
  arg_rw Stream s ; arg Str chars
  s raw_write chars:characters chars:len


method s readline -> l
  arg_rw Stream s ; arg Str l
  if (s:flags .and. next_line_available)<>0
  if has_unreadline and (s:flags .and. next_line_available)<>0
    l := s next_line
    s next_line := ""
    s flags := s:flags-next_line_available
    return
  l := ""
  if s:atend
    return
  s line_number := s:line_number+1
  while true
    if s:atend
      return
    var Int mode := s:flags .and. cr+lf
    var Address eol := memory_search s:read_cur (cast s:read_stop Int).-.(cast s:read_cur Int) (shunt mode<>cr "[lf]" "[cr]"):characters Char:size
    if  mode=0
      var Address eol_cr := memory_search s:read_cur (cast s:read_stop Int).-.(cast s:read_cur Int) "[cr]":characters Char:size
      var Address eol_lf := eol
      eol :=  shunt eol_cr<>null and (eol_lf=null or (cast eol_cr Int).-.(cast s:read_cur Int)<(cast eol_lf Int).-.(cast s:read_cur Int)) eol_cr eol_lf
    var Int extra := (cast (shunt eol<>null eol s:read_stop) Int) .-. (cast s:read_cur Int)
    check extra>=0
    if l:len+extra>s:line_limit
      s error "too long line"
      l := ""
      return
    l resize l:len+extra
    memory_copy s:read_cur (l:characters translate Char l:len-extra) extra
    if eol<>null
      s read_cur := eol translate Char 1
      if mode=cr+lf and l:len>0 and (l l:len-1)="[cr]"
        l resize l:len-1
      if mode=0 
        if eol=eol_cr and not s:atend and (s:read_cur map Char)="[lf]"
          s read_cur := s:read_cur translate Char 1
          mode := cr+lf
        eif eol=eol_lf
          mode := lf
        else
          check eol=eol_cr
          mode := cr
        if (s:flags .and. anyeol)=0
          s:flags := s:flags .or. mode
      return
    else
      s read_cur := s read_stop


method s unreadline l
  arg_rw Stream s ; arg Str l
  s next_line := l
  s flags := s:flags .or. next_line_available
if has_unreadline
  method s unreadline l
    arg_rw Stream s ; arg Str l
    s next_line := l
    s flags := s:flags .or. next_line_available


method s eol
  arg_rw Stream s
  var Int mode := s:flags .and. cr+lf
  var Str eol := shunt mode=0 or mode=lf "[lf]" mode=cr "[cr]" "[cr][lf]"
  s raw_write eol:characters eol:len
  if (s:flags .and. linecache)<>0
    s flush async


method s writeline l
  arg_rw Stream s ; arg Str l
  s writechars l
  s eol


function raw_copy src dest mini maxi -> copied
  arg_rw Stream src dest ; arg Int mini maxi copied
  # should use the 2.2 sendfile function when the Linux kernel is >=2.2 and both streams have a handle
  copied := 0
  while copied<mini
    src read_available (var Address adr) (var Int size) maxi-copied
    if size=0
      return
    dest raw_write adr size
    copied += size


function build s
  arg_w Stream s
  s read_buf := null
  s write_buf := null
  s flags := 0
  s reset
  s:list next :> s list

function destroy s
  arg_w Stream s
  s close

export '. open' '. close' '. raw_read' '. raw_write' '. flush' '. query' '. configure' '. safe_query' '. safe_configure'
export '. writechars' '. readline' '. unreadline' '. eol' '. writeline'
export '. writechars' '. readline' '. eol' '. writeline'
if has_unreadline
  export '. unreadline'
export '. atend' '. read_available' '. error'
export raw_copy


#----------------------------------------------------------------------


function 'cast Status' s -> stat
  arg Stream s ; arg Status stat
  explicit
  stat := shunt s:is_open and not s:is_crashed success failure

export 'cast Status'


alias '. stream_handle' '. handle'
alias '. stream_flags' '. flags'
alias '. stream_driver' '. driver'
alias '. stream_read_buf' '. read_buf'
alias '. stream_read_buf_size' '. read_buf_size'
alias '. stream_read_cur' '. read_cur'
alias '. stream_read_stop' '. read_stop'
alias '. stream_write_buf' '. write_buf'
alias '. stream_write_buf_size' '. write_buf_size'
alias '. stream_write_cur' '. write_cur'
alias '. stream_write_stop' '. write_stop'
export '. stream_handle' '. stream_flags' '. stream_driver'
export '. stream_read_buf' '. stream_read_buf_size' '. stream_read_cur' '. stream_read_stop'
export '. stream_write_buf' '. stream_write_buf_size' '. stream_write_cur' '. stream_write_stop'
if has_rewind
  export '. rewind_limit'
  export '. rewind_open' '. rewind' '. rewind_close'

alias stream_flag_unflushed unflushed
alias stream_flag_crashed crashed
export stream_flag_unflushed stream_flag_crashed

export '. name' '. is_open' '. is_crashed' '. recover'