Patch title: Release 85 bulk changes
Abstract:
File: /pliant/appli/cluster/daemon.pli
Key:
    Removed line
    Added line
module "/pliant/language/compiler.pli"
module "/pliant/language/context.pli"
module "/pliant/language/stream.pli"
module "database.pli"
module "common.pli"
module "/pliant/protocol/dns/name.pli"
module "/pliant/util/remote/common.pli"
module "/pliant/util/encoding/html.pli"
module "/pliant/util/crypto/channel.pli"
module "/pliant/language/schedule/daemon.pli"
module "/pliant/admin/file.pli"

constant verbose_data false
constant debug_data false
constant verbose_file false
constant debug_file false
constant verbose_report false
constant sleep_interval 0.01


method d count -> c
  arg Data_ d ; arg Int c
  c := 1
  var Data_ cur := d:interface first_to_store d "" "" (var DataScanBuffer buf)
  while cur:adr<>null
    c += cur count
    cur := d:interface next_to_store d "" "" buf


type ClusterReport
    field Int file_total_count <- 0 ; field Intn file_total_size <- 0
    field Int file_append_count <- 0 ; field Intn file_append_size <- 0
    field Int file_modified_count  <- 0; field Intn file_modified_size <- 0
    field Int file_deleted_count <- 0 ; field Intn file_deleted_size <- 0
    field Int data_total <- 0
    field Int data_append <- 0
    field Int data_modified <- 0
    field Int data_deleted <- 0


type ClusterPoint
  field ListNode_ node
  field Str src dest ; field Data_ data
  field Str src dest
  field Int action
  field Str sign ; field Int count ; field Intn size
  field CBool created

type ClusterQueue
  field List_ list
  field FastSem sem

function new_point -> p
  arg_RW ClusterPoint p
  p :> (memory_allocate ClusterPoint:size null) map ClusterPoint
  ClusterPoint build_instance addressof:p

function drop_point p
  arg_rw ClusterPoint p
  ClusterPoint destroy_instance addressof:p
  memory_free addressof:p

method q push p
  arg_rw ClusterQueue q ; arg_rw ClusterPoint p
  q:sem request
  q:list append p:node
  q:sem release

method q pop_first -> p
  arg_rw ClusterQueue q ; arg_RW ClusterPoint p
  q:sem request
  p :> (addressof q:list:first) map ClusterPoint
  if exists:p
    q:list remove p:node
  q:sem release

method q pop_last -> p
  arg_rw ClusterQueue q ; arg_RW ClusterPoint p
  q:sem request
  p :> (addressof q:list:last) map ClusterPoint
  if exists:p
    q:list remove p:node
  q:sem release


function data_sync s source area src_path dest_path mode0 filter sign_limit r status
  arg_rw Stream s ; arg Str source ; arg_rw Data:ClusterArea area ; arg Str src_path dest_path ; arg Int mode0 ; arg Function filter ; arg Int sign_limit ; arg_rw ClusterReport r ; arg_rw ExtendedStatus status
  var Int mode := mode0
  var Link:Function time_filter :> cluster_filter area "time" (var Str err)
  var ClusterQueue query_queue answer_queue
  var Pointer:ClusterPoint extra :> new_point
  extra src := src_path
  extra data := data_root search_path dest_path true
  extra sign := data_sign extra:data filter sign_limit extra:count
  extra dest := dest_path
  var Data_ data := data_root search_path dest_path true
  extra sign := data_sign data filter sign_limit extra:count
  extra action := shunt extra:count>1 0 1
  extra created := false
  query_queue push extra
  var Int pending := 1
  var Int running := 1
  thread # send requests
    share s pending running
    share query_queue answer_queue
    while pending>0
      var Pointer:ClusterPoint p :> query_queue pop_last
      if exists:p
        if verbose_data
          console (shunt p:action=0 "dsign " "dread ")+(string p:src) eol
        s writeline (shunt p:action=0 "dsign " "dread ")+(string p:src)
        answer_queue push p
      else
        s flush anytime
        sleep sleep_interval
    atomic_add running -1
  while pending>0 # read answers
    time_filter_prototype (var CBool start) (var CBool run) time_filter
    if not run and mode>1
      mode := 1
    var Pointer:ClusterPoint p :> answer_queue pop_first
    if exists:p
      var Data_ data := data_root search_path p:dest true
      var CBool drop := true
      var Str l := s readline
      if p:action=0
        if (l parse (var Str sign)) and sign=p:sign
          r data_total += p count
        else
          p action := 1
          query_queue push p ; drop := false
      eif p:action=1
        if (l parse (var Str value))
          p:data:base:sem request
          data:base:sem request
          r data_total += 1
          if p:created
            r data_append += 1
            if (data_filter_prototype p:data value source computer_fullname mode filter)>=2
              if (p:data:interface set p:data (addressof value) Str)=failure
                status := failure "Failed to write data "+p:data:path
            if (data_filter_prototype data value source computer_fullname mode filter)>=2
              if (data:interface set data (addressof value) Str)=failure
                status := failure "Failed to write data "+data:path
                if debug_data
                  console "Failed to write data " p:data:path eol
                  console "Failed to write data " data:path eol
          else
            if (p:data:interface get p:data addressof:(var Str current) Str)=success
            if (data:interface get data addressof:(var Str current) Str)=success
              if value<>current
                r data_modified += 1
                if (data_filter_prototype p:data value source computer_fullname mode filter)>=3
                  if (p:data:interface set p:data (addressof value) Str)=failure
                    status := failure "Failed to write data "+p:data:path
                if (data_filter_prototype data value source computer_fullname mode filter)>=3
                  if (data:interface set data (addressof value) Str)=failure
                    status := failure "Failed to write data "+data:path
                    if debug_data
                      console "Failed to write data " p:data:path eol
                      console "Failed to write data " data:path eol
            else
              status := failure "Failed to read data "+p:data:path
              status := failure "Failed to read data "+data:path
              if debug_data
                console "Failed to read data " p:data:path eol
          p:data:base:sem release
          data:base:sem release
        var (Dictionary Str Int) list := var (Dictionary Str Int) empty_dict
        while (s:readline parse (var Str key))
          list insert key 0
        each k list
          var Pointer:ClusterPoint extra :> new_point
          extra src := p:src+"/"+html_encode:(list key k) 
          p:data:base:sem rd_request
          extra data := p:data:interface search p:data (list key k)
          var CBool include := (data_filter_prototype extra:data "" source computer_fullname mode filter)>0
          p:data:base:sem rd_release
          extra dest := p:dest+"/"+html_encode:(list key k) 
          data:base:sem rd_request
          var Data_ data2 := data:interface search data (list key k)
          var CBool include := (data_filter_prototype data2 "" source computer_fullname mode filter)>0
          data:base:sem rd_release
          if include
            extra created := p:created or extra:data:adr=null
            if extra:data:adr=null and (data_filter_prototype extra:data "" source computer_fullname mode filter)>=2
              p:data:base:sem request
              p:data:interface create p:data (list key k)
              extra data := p:data:interface search p:data (list key k)
              p:data:base:sem release
            extra created := p:created or data2:adr=null
            if data2:adr=null and (data_filter_prototype data2 "" source computer_fullname mode filter)>=2
              data:base:sem request
              data:interface create data (list key k)
              data:base:sem release
            if not extra:created
              extra sign := data_sign extra:data filter sign_limit extra:count
              extra sign := data_sign data2 filter sign_limit extra:count
              extra action := shunt extra:count>1 0 1
            else
              extra action := 1
            query_queue push extra
            atomic_add pending 1
          else
            drop_point extra
        if not p:created and s=success
          p:data:base:sem rd_request
          var Int size := p:data:interface count p:data "" ""
          p:data:base:sem rd_release
          data:base:sem rd_request
          var Int size := data:interface count data "" ""
          data:base:sem rd_release
          if list:size<size
            p:data:base:sem request
            var Data_ cur := p:data:interface first p:data "" "" (var DataScanBuffer buf)
            data:base:sem request
            var Data_ cur := data:interface first data "" "" (var DataScanBuffer buf)
            while cur:adr<>null
              if not exists:(list first cur:key)
                if (data_filter_prototype cur "" source computer_fullname mode filter)>0
                  r data_deleted += cur count
                  if (data_filter_prototype cur "" source computer_fullname mode filter)>=4
                    p:data:interface delete p:data cur:key
              cur := p:data:interface next p:data "" "" buf
            p:data:base:sem release
                    data:interface delete data cur:key
              cur := data:interface next data "" "" buf
            data:base:sem release
      if drop
        drop_point p
        atomic_add pending -1
    else
      sleep sleep_interval
  while running>0
    sleep sleep_interval


function file_sync s source area src_path dest_path mode0 filter sign_limit r status
  arg_rw Stream s ; arg Str source ; arg_rw Data:ClusterArea area ; arg Str src_path dest_path ; arg Int mode0 ; arg Function filter ; arg Int sign_limit ; arg_rw ClusterReport r ; arg_rw ExtendedStatus status
  var Int mode := mode0
  var Link:Function time_filter :> cluster_filter area "time" (var Str err)
  var CBool clear := area clear
  var ClusterQueue query_queue answer_queue
  var Pointer:ClusterPoint extra :> new_point
  extra src := src_path
  extra dest := dest_path
  extra sign := file_sign dest_path filter sign_limit extra:count extra:size
  extra action := shunt extra:sign<>"" 0 1
  query_queue push extra
  var Int pending := 1
  var Int running := 1
  thread # send requests
    share s pending running
    share query_queue answer_queue
    while pending>0
      var Pointer:ClusterPoint p :> query_queue pop_last
      if exists:p
        if verbose_file
          console (shunt p:action=0 "fsign " p:action=1 "flist " "fread ")+(string p:src)+(shunt p:action=2 and clear " clear" "") eol
        s writeline (shunt p:action=0 "fsign " p:action=1 "flist " "fread ")+(string p:src)+(shunt p:action=2 and clear " clear" "")
        answer_queue push p
      else
        s flush anytime
        sleep sleep_interval
    atomic_add running -1
  while pending>0 # read answers
    time_filter_prototype (var CBool start) (var CBool run) time_filter
    if not run and mode>1
      mode := 1
    var Pointer:ClusterPoint p :> answer_queue pop_first
    if exists:p
      var CBool drop := true
      if p:action=0
        var Str l := s readline
        if (l parse (var Str sign)) and sign=p:sign
          r file_total_count += p count ; r file_total_size += p size
        else
          p action := 1
          query_queue push p ; drop := false
      eif p:action=1
        var (Dictionary Str Int) src_list := var (Dictionary Str Int) empty_dict
        var (Dictionary Str Int) dest_list := var (Dictionary Str Int) empty_dict
        var Array:FileInfo files := file_list p:dest extended+relative+directories+deadlinks
        for (var Int i) 0 files:size-1
          dest_list insert files:i:name i
        while (s:readline parse (var Str name) (var Intn size) (var DateTime timestamp) any:(var Str options))
          if (file_filter_prototype p:dest+name source computer_fullname mode filter)>0
            var Pointer:Int ptr_dest :> dest_list first name
            var Pointer:FileInfo info
            if exists:ptr_dest
              info :> files ptr_dest
            else
              info :> null map FileInfo
            if (options option "link")
              r file_total_count += 1
              if not exists:info
                r file_append_count += 1
                if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=2
                  file_link "file:"+(options option "link" Str) p:dest+name
              eif info:link<>(options option "link" Str)
                console info:link " -> " (options option "link" Str) eol
                r file_modified_count += 1
                if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=3
                  file_delete p:dest+name
                  file_link "file:"+(options option "link" Str) p:dest+name
            eif name:len>0 and (name name:len-1)="/"
              if exists:info and (info:is_link or not info:is_directory)
                if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=3
                  file_delete p:dest+name
              var Pointer:ClusterPoint extra :> new_point
              extra src := p:src+name
              extra dest := p:dest+name
              extra sign := file_sign extra:dest filter sign_limit extra:count extra:size
              extra action := shunt extra:sign<>"" 0 1
              query_queue push extra
              atomic_add pending 1
            else # simple file
              r file_total_count += 1 ; r file_total_size += size
              if not exists:info
                r file_append_count += 1 ; r file_append_size += size
                if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=2
                  var Pointer:ClusterPoint extra :> new_point
                  extra src := p:src+name
                  extra dest := p:dest+name
                  extra sign := "datetime "+(string timestamp)+" "+options
                  extra action := 2
                  query_queue push extra
                  atomic_add pending 1
              eif info:size<>size or info:datetime<>timestamp
                r file_modified_count += 1 ; r file_modified_size += size
                if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=3
                  if info:is_link
                    file_delete p:dest+name
                  eif info:is_directory
                    file_tree_delete p:dest+name
                  var Pointer:ClusterPoint extra :> new_point
                  extra src := p:src+name
                  extra dest := p:dest+name
                  extra sign := "datetime "+(string timestamp)+" "+options
                  extra action := 2
                  query_queue push extra
                  atomic_add pending 1
          src_list insert name 0
        if s=success
          for (var Int i) 0 files:size-1
            if not exists:(src_list first files:i:name)
              if (file_filter_prototype p:dest+files:i:name source computer_fullname mode filter)>0
                if files:i:is_link
                  r file_deleted_count += 1
                eif files:i:is_directory
                  var Array:FileInfo files2 := file_list p:dest+files:i:name extended+recursive
                  for (var Int j) 0 files2:size-1
                    if not files2:j:is_directory and not files2:j:is_link
                      r file_deleted_count += 1 ; r file_deleted_size += files2:size
                else
                  r file_deleted_count += 1 ; r file_deleted_size += files:i:size
                if (file_filter_prototype p:dest+files:i:name source computer_fullname mode filter)>=4
                  if files:i:is_link
                    file_delete p:dest+files:i:name
                  eif files:i:is_directory
                    file_tree_delete p:dest+files:i:name
                  else
                    file_delete p:dest+files:i:name
      eif p:action=2
        var Str l := s readline
        if (l parse (var Intn remain))
          var Pointer:Stream s2 :> s
          if clear
            s2 :> s channel_support
          var Str ext := ".tmp"
          if p:dest:len>0 and (p:dest p:dest:len-1)="."
            ext := "tmp"
          if (file_query p:dest+ext standard)=defined
            ext := ""
          (var Stream file) open p:dest+ext out+safe+mkdir
          part copy "receive "+p:dest
            while remain>0
              var Int step
              if remain>=2^24
                step := 2^24
              else
                step := remain
              step := raw_copy s2 file step step
              remain -= step
              if step=0
                leave copy
          var CBool write_ok := remain=0
          var CBool close_ok := file:close=success
          part fill
            while remain<>0
              var Int step
              if remain>=2^24
                step := 2^24
              else
                step := remain
              s2 read_available (var Address adr) (var Int step) step
              remain -= step
              if step=0
                leave fill
          var CBool read_ok := s:readline="ok"
          if read_ok and write_ok and close_ok
            file_configure p:dest+ext p:sign
            if ext<>""
              file_delete p:dest
              file_move p:dest+ext p:dest
          else
            file_delete p:dest+ext
            status := failure "Failed to "+(shunt not read_ok "read" "write")+" file "+p:dest
            if debug_data
              console "Failed to "+(shunt ok "read" "write")+" file "+p:dest eol
      if drop
        drop_point p
        atomic_add pending -1
    else
      sleep sleep_interval
  while running>0
    sleep sleep_interval


function cluster_sync areaid path mode0 -> status
  arg Str areaid path mode0 ; arg ExtendedStatus status
  # 'path' is related to the destination side
  var Int mode := shunt mode0="count" 1 mode0="append" 2 mode0="modify" 3 mode0="delete" 4 0
  var Data:ClusterArea area :> cluster_database:data:area areaid
  if not exists:area
    return (failure "There is no '"+areaid+"' clustering area.")
  var Str err := ""
  each m area:module
    pliant_compiler_semaphore request
    if not exists:(pliant_load_module m the_module:"/pliant/language/basic/safe.pli" 0 (null map Module))
      err := "Failed to compile module "+m
    pliant_compiler_semaphore release
  if err<>""
    area error := err
    return failure:err
  var Link:Function file_filter :> cluster_filter area "dest_file" (var Str err)
  if err<>""
    area error := err
    return failure:"Failed to compile file filtering formula"
  var Link:Function data_filter :> cluster_filter area "dest_data" (var Str err)
  if err<>""
    area error := err
    return failure:"Failed to compile data filtering formula"
  area start := datetime
  area stop := undefined
  area status := "R"
  var ClusterReport r
  status := success
  each server area:src_computer
    var Int port := (name_database:data:host keyof:server) remote_port
    var Stream s
    var ExtendedStatus os := s open "zchannel://"+keyof:server+"/site/"+(string (shunt port=defined port remote_tcp_port)+1)+"/"+computer_fullname in+out+noautopost+bigcache+safe
    if os=success
      if area:timeout=defined
        s configure "timeout "+(string area:timeout)
      s writeline "area "+string:areaid ; s flush anytime
      var Str l := s readline
      if l="ok"
        each p area:data_path
          if p:src<>"" and p:dest<>"" # syncing all databases at once is probably a very bad idea
            if (p:dest 0 path:len)=path
              data_sync s keyof:server area p:src p:dest mode data_filter area:data_sign_limit r status
            eif (path 0 p:dest:len)=p:dest
              data_sync s keyof:server area p:src+(path p:dest:len path:len) path mode data_filter area:data_sign_limit r status
        each p area:file_path
          if p:src<>"" and p:dest<>""
            if (p:dest 0 path:len)=path
              file_sync s keyof:server area p:src p:dest mode file_filter area:file_sign_limit r status
            eif (path 0 p:dest:len)=p:dest
              file_sync s keyof:server area p:src+(path p:dest:len path:len) path mode file_filter area:file_sign_limit r status
      else
        status := failure "'"+keyof:server+"' "+(shunt l="error" "reported internal error" "has no '"+areaid+"' clustering area")
    else
      status := failure "Failed to connect to '"+keyof:server+"' ("+os:message+")"
  area error := shunt status=success "" status:message
  area file_total_count := r file_total_count ; area file_total_size := r file_total_size
  area file_append_count := r file_append_count ; area file_append_size := r file_append_size
  area file_modified_count := r file_modified_count ; area file_modified_size := r file_modified_size
  area file_deleted_count := r file_deleted_count ; area file_deleted_size := r file_deleted_size
  area data_total := r data_total
  area data_append := r data_append
  area data_modified := r data_modified
  area data_deleted := r data_deleted
  area stop := datetime
  area status := " "
  var Link:Function post :> cluster_filter area "post" (var Str err)
  post_process_prototype area post
  if verbose_report
    console "data total " area:data_total " append " area:data_append " modified " area:data_modified " deleted " area:data_deleted eol
    console "file total " area:file_total_count ":" area:file_total_size " append " area:file_append_count ":" area:file_append_size " modified " area:file_modified_count ":" area:file_modified_size " deleted " area:file_deleted_count ":" area:file_deleted_size eol
    console (shunt status=success "done" "failed ("+status:message+")")+" in " (string area:stop:seconds-area:start:seconds "fixed 0") " seconds" eol


function cluster_daemon
  daemon "clustering daemon"
    while not daemon_emergency
      each area cluster_database:data:area filter (exists area:dest_computer:computer_fullname) and area:mode<>""
        if area:status=" " and area:frequency=defined and datetime:seconds-area:stop:seconds>area:frequency
          var Link:Function filter :> cluster_filter area "time" (var Str err)
          time_filter_prototype (var CBool start) (var CBool run) filter
          if start and run
            area status := "R"
            thread
              part sync "synchronise area "+keyof:area
                cluster_sync keyof:area "" area:mode              
      daemon_sleep 15

export cluster_sync cluster_daemon