Patch title: Release 94 bulk changes
Abstract:
File: /pliant/storage/ground/control.pli
Key:
    Removed line
    Added line
module "/pliant/language/compiler.pli"
module "/pliant/language/context.pli"
module "/pliant/admin/file.pli"
module "/pliant/language/stream.pli"
module "/pliant/util/pml/io.pli"
module "/pliant/language/compiler/type/inherit.pli"
module "/pliant/language/data/cache.pli"
module "/pliant/language/schedule/daemon.pli"
module "resolve.pli"


method t field_offset f -> o
  arg Type t ; arg Str f ; arg Int o
  for (var Int i) 0 t:nb_fields-1 
    if (t field i):name=f
      return (t field i):offset
  error error_id_missing "Type "+t:name+" does not have a "+f+" field."


#--------------------------------------------------------------------------


public

  gvar Int storage_handles_limit := 25
  gvar Sem storage_control_type_sem
  gvar Dictionary storage_control_types

  type StorageFiber
    field Str type_name
    field Link:Type type
    field Intn dump_offset # defines the offset in the full log where changes start after dump (undefined means no changes futher than dump)

  type StorageHost
    field Str name
    field Intn sync_offset # we have the same full log up to specified offset

  type StorageControl
    inherit CachePrototype
    field Stream stream
    field Sem sem
    field Str path
    field Intn discard_offset <- 0 # defines how many bytes of the full log have been discarded
    # field Intn end_offset
    field (Dictionary Str StorageFiber) fiber
    field Intn dump_step <- 4096
    field (Dictionary Str StorageHost) host
    field CBool current_active <- false
    field Str current_fiber
    field Str current_user
    field DateTime current_timestamp
    field ListNode_ modified_node

  CachePrototype maybe StorageControl
  
  gvar Sem storage_control_modified_sem
  gvar List_ storage_control_modified_list
  gvar Int storage_control_modified_count := 0


method c update s -> status
  oarg_rw StorageControl c ; arg_rw Stream s ; arg ExtendedStatus status
  c:sem request
  status := success
  if (s itag "discard_offset" c:discard_offset)
    void
  eif (s itag "host_create" (var Str name))
    var Pointer:StorageHost psh :> c:host first name
    if exists:psh
      c:host remove psh
    var StorageHost sh := var StorageHost empty_host
    sh sync_offset := 0
    c:host insert name sh
  eif (s itag "host_delete" (var Str name))
    var Pointer:StorageHost psh :> c:host first name
    if exists:psh
      c:host remove psh
  eif (s itag "host_offset" (var Str name) (var Intn offset))
    var Pointer:StorageHost psh :> c:host first name
    if exists:psh
      psh sync_offset := offset
  eif (s itag "fiber_create" (var Str name) (var Str typename))
    var Pointer:StorageFiber psf :> c:fiber first name
    if exists:psf
      c:fiber remove psf
    var StorageFiber sf := var StorageFiber empty_fiber
    sf type_name := typename
    storage_control_type_sem rd_request
    sf type :> (storage_control_types first typename) map Type
    storage_control_type_sem rd_release
    sf dump_offset := undefined
    c:fiber insert name sf
  eif (s itag "fiber_delete" (var Str name))
    var Pointer:StorageFiber psf :> c:fiber first name
    if exists:psf
      c:fiber remove psf
  eif (s itag "fiber_dump" (var Str name) (var Intn offset))
    var Pointer:StorageFiber psf :> c:fiber first name
    if exists:psf
      psf dump_offset := offset
  else
    status := failure "unsupported instruction"
  c:sem release
  

method c fiber_create name typename -> status
  oarg_rw StorageControl c ; arg Str name typename ; arg ExtendedStatus status
  c:sem request
  (var Stream s) open c:path+"control" append+mkdir+safe
  if s=failure
    c:sem release
    return failure:"open"
  s safe_configure "journal"
  s otag "fiber_create" name typename
  if s:close=failure
    c:sem release
    return failure:"close"
  var Pointer:StorageFiber psf :> c:fiber first name
  if exists:psf
    c:fiber remove psf
  var StorageFiber sf := var StorageFiber empty_fiber
  sf type_name := typename
  storage_control_type_sem rd_request
  sf type :> (storage_control_types first typename) map Type
  storage_control_type_sem rd_release
  sf dump_offset := undefined
  c:fiber insert name sf
  c:sem release
  status := success


function call_sleep object param
  oarg_rw CachePrototype object ; arg_rw Universal param
  object sleep

function storage_sleep_daemon
  daemon "storage sleep daemon"
    while not daemon_emergency
      daemon_sleep 5
      cache_broadcast (the_function call_sleep CachePrototype Universal) (var Universal drop)


function storage_control_daemon
  daemon "storage control daemon"
    while storage_control_modified_count>0 and not daemon_emergency
      daemon_sleep 5
      storage_control_modified_sem request
      var Pointer:ListNode_ n :> storage_control_modified_list first
      while exists:n
        constant modified_node_offset (StorageControl field_offset "modified_node")
        var Link:StorageControl c :> (addressof:n translate Byte -modified_node_offset) map StorageControl
        c current_active := false
        if c:sem:nowait_request
          c:stream close
          c:sem release
          n :> storage_control_modified_list remove n
          storage_control_modified_count -= 1
        else
          n :> n next
      storage_control_modified_sem release


method c do_dump f site category object fiber data -> status
  oarg_rw StorageControl c ; arg_rw StorageFiber f ; arg Str site category object fiber ; oarg_rw CachePrototype data ; arg Status status
  status := failure
  (var Stream s) open c:path+"temp_"+fiber out+safe
  if (data dump s)=success and s:close=success
    # console "dump " c:path+"temp"+fiber " ok" eol
    file_delete c:path+"dump_"+fiber
    if (file_move c:path+"temp_"+fiber c:path+"dump_"+fiber)=success
      (var Stream s) open c:path+"control" append+mkdir+safe
      if s=success
        s safe_configure "journal"
        s otag "fiber_dump" fiber (cast undefined Intn)
        if s:close=failure
          f dump_offset := undefined
          status := success
  else
    # console "dump " c:path+"temp_"+fiber " failed" eol
    s close
    file_delete c:path+"temp_"+fiber

method c fiber_dump fiber -> status
  oarg_rw StorageControl c ; arg Str fiber ; arg Status status
  c:sem request
  var Pointer:StorageFiber f :> c:fiber first fiber
  if not exists:f
    status := failure
  eif f:dump_offset=undefined
    status := success
  else
    status := failure
    if (c:id eparse "/pliant/control/" any:(var Str site) "/" any:(var Str category) "/" any:(var Str object))
      if (cache_search "/pliant/fiber/"+site+"/"+category+"/"+object+"/"+fiber (var Link:CachePrototype data))
        status := c do_dump f site category object fiber data
  c:sem release


method c fiber_modify fiber -> status
  oarg_rw StorageControl c ; arg Str fiber ; arg Status status
  check not c:sem:nowait_rd_request
  var Pointer:StorageFiber f :> c:fiber first fiber
  if not exists:f
    return failure
  if f:dump_offset=undefined
    if c:stream:is_open
      c:stream flush anytime
    var FileInfo log_info := file_query c:path+"log" standard
    f dump_offset := c:discard_offset+(shunt log_info=success log_info:size 0)
    (var Stream s) open c:path+"control" append+mkdir+safe
    if s=failure
      return failure
    s safe_configure "journal"
    s otag "fiber_dump" fiber f:dump_offset
    if s:close=failure
      return failure
  eif not c:current_active
    if (c:id eparse "/pliant/control/" any:(var Str site) "/" any:(var Str category) "/" any:(var Str object))
      if (cache_search "/pliant/fiber/"+site+"/"+category+"/"+object+"/"+fiber (var Link:CachePrototype data))
        var FileInfo log_info := file_query c:path+"log" standard
        var Intn current_offset := c:discard_offset+(shunt log_info=success log_info:size 0)
        var FileInfo dump_info := file_query c:path+"dump_"+fiber standard
        var Intn dump_size := shunt dump_info=success dump_info:size 0
        if current_offset-f:dump_offset>=dump_size+c:dump_step
          c do_dump f site category object fiber data
  if not c:stream:is_open
    if storage_control_modified_count>=storage_handles_limit
      storage_control_modified_sem request
      var Pointer:ListNode_ n :> storage_control_modified_list first
      while exists:n and storage_control_modified_count>=storage_handles_limit
        constant modified_node_offset (StorageControl field_offset "modified_node")
        var Link:StorageControl pc :> (addressof:n translate Byte -modified_node_offset) map StorageControl
        if pc:sem:nowait_request
          pc current_active := false
          pc:stream close
          pc:sem release
          n :> storage_control_modified_list remove n
          storage_control_modified_count -= 1
        else
          n :> n next
      storage_control_modified_sem release
    status := c:stream open c:path+"log" append+safe
    if status=success
      c:stream safe_configure "journal"
      storage_control_modified_sem request
      storage_control_modified_list append c:modified_node
      storage_control_modified_count += 1
      storage_control_modified_sem release
      storage_control_daemon
  else
    status := success

method c fiber_modify_begin fiber user -> status
  oarg_rw StorageControl c ; arg Str fiber user ; arg Status status
  c:sem request
  if (c fiber_modify fiber)=failure
    c:sem release
    return failure
  if c:current_active and fiber=c:current_fiber and user=c:current_user
    c:stream oraw open (cast "f" Ident) body
  else
    c:stream oraw open (cast "fiber" Ident) fiber
    if true
      c:stream oraw (cast "host" Ident) computer_fullname
    if user<>""
      c:stream oraw (cast "user" Ident) user
    var DateTime ts := datetime
    c:stream oraw (cast "ts" Ident) ts
    c:stream oraw body
    c current_active := true
    c current_fiber := fiber
    c current_user := user
    c current_timestamp := ts
  status := success

method c fiber_modify_end
  oarg_rw StorageControl c
  c:stream oraw close
  c:stream flush anytime
  c:sem release


function storage_control site category object -> control
  arg Str site category object ; arg Link:StorageControl control
  if (cache_open "/pliant/control/"+site+"/"+category+"/"+object StorageControl ((addressof Link:StorageControl control) map Link:CachePrototype))
    control path := resolve_path site category object
    (var Stream s) open control:path+"control" in+safe
    while s:iavailable
      if (control update s)=failure
        if s:iskip=failure
          cache_cancel ((addressof Link:StorageControl control) map Link:CachePrototype)
          control :> null map StorageControl
          return
    # var FileInfo log_info := file_query control:path+"log" standard
    # control end_offset := control:discard_offset+(shunt log_info=success log_info:size 0)
    cache_ready ((addressof Link:StorageControl control) map Link:CachePrototype)
    storage_sleep_daemon


export storage_control '. fiber_create' '. fiber_modify_begin' '. fiber_modify_end'
export '. fiber_dump'