Patch title: Release 85 bulk changes
Abstract:
File: /pliant/appli/cluster/daemon.pli
Key:
    Removed line
    Added line
   
module "/pliant/language/compiler.pli"
module "/pliant/language/context.pli"
module "/pliant/language/stream.pli"
module "database.pli"
module "common.pli"
module "/pliant/protocol/dns/name.pli"
module "/pliant/util/remote/common.pli"
module "/pliant/util/encoding/html.pli"
module "/pliant/util/crypto/channel.pli"
module "/pliant/language/schedule/daemon.pli"
module "/pliant/admin/file.pli"


type ClusterPoint
  field ListNode_ node
module "/pliant/language/compiler.pli"
module "/pliant/language/context.pli"
module "/pliant/language/stream.pli"
module "database.pli"
module "common.pli"
module "/pliant/protocol/dns/name.pli"
module "/pliant/util/remote/common.pli"
module "/pliant/util/encoding/html.pli"
module "/pliant/util/crypto/channel.pli"
module "/pliant/language/schedule/daemon.pli"
module "/pliant/admin/file.pli"


type ClusterPoint
  field ListNode_ node
  field Str src dest ; field Data_ data
  field Str src dest
  field Int action
  field Str sign ; field Int count ; field Intn size
  field CBool created


  field Int action
  field Str sign ; field Int count ; field Intn size
  field CBool created



function data_sync s source area src_path dest_path mode0 fi
  arg_rw Stream s ; arg Str source ; arg_rw Data:ClusterArea
  var Int mode := mode0
  var Link:Function time_filter :> cluster_filter area "time
  var ClusterQueue query_queue answer_queue
  var Pointer:ClusterPoint extra :> new_point
  extra src := src_path
function data_sync s source area src_path dest_path mode0 fi
  arg_rw Stream s ; arg Str source ; arg_rw Data:ClusterArea
  var Int mode := mode0
  var Link:Function time_filter :> cluster_filter area "time
  var ClusterQueue query_queue answer_queue
  var Pointer:ClusterPoint extra :> new_point
  extra src := src_path
  extra data := data_root search_path dest_path true
  extra sign := data_sign extra:data filter sign_limit extra
  extra dest := dest_path
  var Data_ data := data_root search_path dest_path true
  extra sign := data_sign data filter sign_limit extra:count
  extra action := shunt extra:count>1 0 1
  extra created := false
  query_queue push extra
  var Int pending := 1
  var Int running := 1
  thread # send requests
    share s pending running
    share query_queue answer_queue
    while pending>0
      var Pointer:ClusterPoint p :> query_queue pop_last
      if exists:p
        if verbose_data
          console (shunt p:action=0 "dsign " "dread ")+(stri
        s writeline (shunt p:action=0 "dsign " "dread ")+(st
        answer_queue push p
      else
        s flush anytime
        sleep sleep_interval
    atomic_add running -1
  while pending>0 # read answers
    time_filter_prototype (var CBool start) (var CBool run) 
    if not run and mode>1
      mode := 1
    var Pointer:ClusterPoint p :> answer_queue pop_first
    if exists:p
  extra action := shunt extra:count>1 0 1
  extra created := false
  query_queue push extra
  var Int pending := 1
  var Int running := 1
  thread # send requests
    share s pending running
    share query_queue answer_queue
    while pending>0
      var Pointer:ClusterPoint p :> query_queue pop_last
      if exists:p
        if verbose_data
          console (shunt p:action=0 "dsign " "dread ")+(stri
        s writeline (shunt p:action=0 "dsign " "dread ")+(st
        answer_queue push p
      else
        s flush anytime
        sleep sleep_interval
    atomic_add running -1
  while pending>0 # read answers
    time_filter_prototype (var CBool start) (var CBool run) 
    if not run and mode>1
      mode := 1
    var Pointer:ClusterPoint p :> answer_queue pop_first
    if exists:p
      var Data_ data := data_root search_path p:dest true
      var CBool drop := true
      var Str l := s readline
      if p:action=0
        if (l parse (var Str sign)) and sign=p:sign
          r data_total += p count
        else
          p action := 1
          query_queue push p ; drop := false
      eif p:action=1
        if (l parse (var Str value))
      var CBool drop := true
      var Str l := s readline
      if p:action=0
        if (l parse (var Str sign)) and sign=p:sign
          r data_total += p count
        else
          p action := 1
          query_queue push p ; drop := false
      eif p:action=1
        if (l parse (var Str value))
          p:data:base:sem request
          data:base:sem request
          r data_total += 1
          if p:created
            r data_append += 1
          r data_total += 1
          if p:created
            r data_append += 1
            if (data_filter_prototype p:data value source co
              if (p:data:interface set p:data (addressof val
                status := failure "Failed to write data "+p:
            if (data_filter_prototype data value source computer_fullname mode filter)>=2
              if (data:interface set data (addressof value) Str)=failure
                status := failure "Failed to write data "+data:path
                if debug_data
                if debug_data
                  console "Failed to write data " p:data:pat
                  console "Failed to write data " data:path eol
          else
          else
            if (p:data:interface get p:data addressof:(var S
            if (data:interface get data addressof:(var Str current) Str)=success
              if value<>current
                r data_modified += 1
              if value<>current
                r data_modified += 1
                if (data_filter_prototype p:data value sourc
                  if (p:data:interface set p:data (addressof
                    status := failure "Failed to write data 
                if (data_filter_prototype data value source computer_fullname mode filter)>=3
                  if (data:interface set data (addressof value) Str)=failure
                    status := failure "Failed to write data "+data:path
                    if debug_data
                    if debug_data
                      console "Failed to write data " p:data
                      console "Failed to write data " data:path eol
            else
            else
              status := failure "Failed to read data "+p:dat
              status := failure "Failed to read data "+data:path
              if debug_data
                console "Failed to read data " p:data:path e
              if debug_data
                console "Failed to read data " p:data:path e
          p:data:base:sem release
          data:base:sem release
        var (Dictionary Str Int) list := var (Dictionary Str
        while (s:readline parse (var Str key))
          list insert key 0
        each k list
          var Pointer:ClusterPoint extra :> new_point
          extra src := p:src+"/"+html_encode:(list key k) 
        var (Dictionary Str Int) list := var (Dictionary Str
        while (s:readline parse (var Str key))
          list insert key 0
        each k list
          var Pointer:ClusterPoint extra :> new_point
          extra src := p:src+"/"+html_encode:(list key k) 
          p:data:base:sem rd_request
          extra data := p:data:interface search p:data (list
          var CBool include := (data_filter_prototype extra:
          p:data:base:sem rd_release
          extra dest := p:dest+"/"+html_encode:(list key k) 
          data:base:sem rd_request
          var Data_ data2 := data:interface search data (list key k)
          var CBool include := (data_filter_prototype data2 "" source computer_fullname mode filter)>0
          data:base:sem rd_release
          if include
          if include
            extra created := p:created or extra:data:adr=nul
            if extra:data:adr=null and (data_filter_prototyp
              p:data:base:sem request
              p:data:interface create p:data (list key k)
              extra data := p:data:interface search p:data (
              p:data:base:sem release
            extra created := p:created or data2:adr=null
            if data2:adr=null and (data_filter_prototype data2 "" source computer_fullname mode filter)>=2
              data:base:sem request
              data:interface create data (list key k)
              data:base:sem release
            if not extra:created
            if not extra:created
              extra sign := data_sign extra:data filter sign
              extra sign := data_sign data2 filter sign_limit extra:count
              extra action := shunt extra:count>1 0 1
            else
              extra action := 1
            query_queue push extra
            atomic_add pending 1
          else
            drop_point extra
        if not p:created and s=success
              extra action := shunt extra:count>1 0 1
            else
              extra action := 1
            query_queue push extra
            atomic_add pending 1
          else
            drop_point extra
        if not p:created and s=success
          p:data:base:sem rd_request
          var Int size := p:data:interface count p:data "" "
          p:data:base:sem rd_release
          data:base:sem rd_request
          var Int size := data:interface count data "" ""
          data:base:sem rd_release
          if list:size<size
          if list:size<size
            p:data:base:sem request
            var Data_ cur := p:data:interface first p:data "
            data:base:sem request
            var Data_ cur := data:interface first data "" "" (var DataScanBuffer buf)
            while cur:adr<>null
              if not exists:(list first cur:key)
                if (data_filter_prototype cur "" source comp
                  r data_deleted += cur count
                  if (data_filter_prototype cur "" source co
            while cur:adr<>null
              if not exists:(list first cur:key)
                if (data_filter_prototype cur "" source comp
                  r data_deleted += cur count
                  if (data_filter_prototype cur "" source co
                    p:data:interface delete p:data cur:key
              cur := p:data:interface next p:data "" "" buf
            p:data:base:sem release
                    data:interface delete data cur:key
              cur := data:interface next data "" "" buf
            data:base:sem release
      if drop
        drop_point p
        atomic_add pending -1
    else
      sleep sleep_interval
  while running>0
    sleep sleep_interval



export cluster_sync cluster_daemon
      if drop
        drop_point p
        atomic_add pending -1
    else
      sleep sleep_interval
  while running>0
    sleep sleep_interval



export cluster_sync cluster_daemon