/pliant/appli/cluster/daemon.pli
 
 1  module "/pliant/language/compiler.pli" 
 2  module "/pliant/language/context.pli" 
 3  module "/pliant/language/stream.pli" 
 4  module "database.pli" 
 5  module "common.pli" 
 6  module "/pliant/protocol/dns/name.pli" 
 7  module "/pliant/util/remote/common.pli" 
 8  module "/pliant/util/encoding/html.pli" 
 9  module "/pliant/util/crypto/channel.pli" 
 10  module "/pliant/language/schedule/daemon.pli" 
 11  module "/pliant/admin/file.pli" 
 12   
 13  constant verbose_data false 
 14  constant debug_data false 
 15  constant verbose_file false 
 16  constant debug_file false 
 17  constant verbose_report false 
 18  constant sleep_interval 0.01 
 19   
 20   
 21  method d count -> c 
 22    arg Data_ d ; arg Int c 
 23    c := 1 
 24    var Data_ cur := d:interface first_to_store d "" "" (var DataScanBuffer buf) 
 25    while cur:adr<>null 
 26      c += cur count 
 27      cur := d:interface next_to_store d "" "" buf 
 28   
 29   
 30  type ClusterReport 
 31      field Int file_total_count <- 0 ; field Intn file_total_size <- 0 
 32      field Int file_append_count <- 0 ; field Intn file_append_size <- 0 
 33      field Int file_modified_count  <- 0; field Intn file_modified_size <- 0 
 34      field Int file_deleted_count <- 0 ; field Intn file_deleted_size <- 0 
 35      field Int data_total <- 0 
 36      field Int data_append <- 0 
 37      field Int data_modified <- 0 
 38      field Int data_deleted <- 0 
 39   
 40   
 41  type ClusterPoint 
 42    field ListNode_ node 
 43    field Str src dest 
 44    field Int action 
 45    field Str sign ; field Int count ; field Intn size 
 46    field CBool created 
 47   
 48  type ClusterQueue 
 49    field List_ list 
 50    field FastSem sem 
 51   
 52  function new_point -> p 
 53    arg_RW ClusterPoint p 
 54    p :> (memory_allocate ClusterPoint:size null) map ClusterPoint 
 55    ClusterPoint build_instance addressof:p 
 56   
 57  function drop_point p 
 58    arg_rw ClusterPoint p 
 59    ClusterPoint destroy_instance addressof:p 
 60    memory_free addressof:p 
 61   
 62  method q push p 
 63    arg_rw ClusterQueue q ; arg_rw ClusterPoint p 
 64    q:sem request 
 65    q:list append p:node 
 66    q:sem release 
 67   
 68  method q pop_first -> p 
 69    arg_rw ClusterQueue q ; arg_RW ClusterPoint p 
 70    q:sem request 
 71    p :> (addressof q:list:first) map ClusterPoint 
 72    if exists:p 
 73      q:list remove p:node 
 74    q:sem release 
 75   
 76  method q pop_last -> p 
 77    arg_rw ClusterQueue q ; arg_RW ClusterPoint p 
 78    q:sem request 
 79    p :> (addressof q:list:last) map ClusterPoint 
 80    if exists:p 
 81      q:list remove p:node 
 82    q:sem release 
 83   
 84  function data_sync s source area src_path dest_path mode0 filter sign_limit r status 
 85    arg_rw Stream s ; arg Str source ; arg_rw Data:ClusterArea area ; arg Str src_path dest_path ; arg Int mode0 ; arg Function filter ; arg Int sign_limit ; arg_rw ClusterReport r ; arg_rw ExtendedStatus status 
 86    var Int mode := mode0 
 87    var Link:Function time_filter :> cluster_filter area "time" (var Str err) 
 88    var ClusterQueue query_queue answer_queue 
 89    var Pointer:ClusterPoint extra :> new_point 
 90    extra src := src_path 
 91    extra dest := dest_path 
 92    var Data_ data := data_root search_path dest_path true 
 93    extra sign := data_sign data filter sign_limit extra:count 
 94    extra action := shunt extra:count>1 0 1 
 95    extra created := false 
 96    query_queue push extra 
 97    var Int pending := 1 
 98    var Int running := 1 
 99    thread # send requests 
 100      share s pending running 
 101      share query_queue answer_queue 
 102      while pending>0 
 103        var Pointer:ClusterPoint p :> query_queue pop_last 
 104        if exists:p 
 105          if verbose_data 
 106            console (shunt p:action=0 "dsign " "dread ")+(string p:src) eol 
 107          s writeline (shunt p:action=0 "dsign " "dread ")+(string p:src) 
 108          answer_queue push p 
 109        else 
 110          s flush anytime 
 111          sleep sleep_interval 
 112      atomic_add running -1 
 113    while pending>0 # read answers 
 114      time_filter_prototype (var CBool start) (var CBool run) time_filter 
 115      if not run and mode>1 
 116        mode := 1 
 117      var Pointer:ClusterPoint p :> answer_queue pop_first 
 118      if exists:p 
 119        var Data_ data := data_root search_path p:dest true 
 120        var CBool drop := true 
 121        var Str l := s readline 
 122        if p:action=0 
 123          if (l parse (var Str sign)) and sign=p:sign 
 124            r data_total += p count 
 125          else 
 126            p action := 1 
 127            query_queue push p ; drop := false 
 128        eif p:action=1 
 129          if (l parse (var Str value)) 
 130            data:base:sem request 
 131            r data_total += 1 
 132            if p:created 
 133              r data_append += 1 
 134              if (data_filter_prototype data value source computer_fullname mode filter)>=2 
 135                if (data:interface set data (addressof value) Str)=failure 
 136                  status := failure "Failed to write data "+data:path 
 137                  if debug_data 
 138                    console "Failed to write data " data:path eol 
 139            else 
 140              if (data:interface get data addressof:(var Str current) Str)=success 
 141                if value<>current 
 142                  r data_modified += 1 
 143                  if (data_filter_prototype data value source computer_fullname mode filter)>=3 
 144                    if (data:interface set data (addressof value) Str)=failure 
 145                      status := failure "Failed to write data "+data:path 
 146                      if debug_data 
 147                        console "Failed to write data " data:path eol 
 148              else 
 149                status := failure "Failed to read data "+data:path 
 150                if debug_data 
 151                  console "Failed to read data " p:data:path eol 
 152            data:base:sem release 
 153          var (Dictionary Str Int) list := var (Dictionary Str Int) empty_dict 
 154          while (s:readline parse (var Str key)) 
 155            list insert key 0 
 156          each k list 
 157            var Pointer:ClusterPoint extra :> new_point 
 158            extra src := p:src+"/"+html_encode:(list key k)  
 159            extra dest := p:dest+"/"+html_encode:(list key k)  
 160            data:base:sem rd_request 
 161            var Data_ data2 := data:interface search data (list key k) 
 162            var CBool include := (data_filter_prototype data2 "" source computer_fullname mode filter)>0 
 163            data:base:sem rd_release 
 164            if include 
 165              extra created := p:created or data2:adr=null 
 166              if data2:adr=null and (data_filter_prototype data2 "" source computer_fullname mode filter)>=2 
 167                data:base:sem request 
 168                data:interface create data (list key k) 
 169                data:base:sem release 
 170              if not extra:created 
 171                extra sign := data_sign data2 filter sign_limit extra:count 
 172                extra action := shunt extra:count>1 0 1 
 173              else 
 174                extra action := 1 
 175              query_queue push extra 
 176              atomic_add pending 1 
 177            else 
 178              drop_point extra 
 179          if not p:created and s=success 
 180            data:base:sem rd_request 
 181            var Int size := data:interface count data "" "" 
 182            data:base:sem rd_release 
 183            if list:size<size 
 184              data:base:sem request 
 185              var Data_ cur := data:interface first data "" "" (var DataScanBuffer buf) 
 186              while cur:adr<>null 
 187                if not exists:(list first cur:key) 
 188                  if (data_filter_prototype cur "" source computer_fullname mode filter)>0 
 189                    r data_deleted += cur count 
 190                    if (data_filter_prototype cur "" source computer_fullname mode filter)>=4 
 191                      data:interface delete data cur:key 
 192                cur := data:interface next data "" "" buf 
 193              data:base:sem release 
 194        if drop 
 195          drop_point p 
 196          atomic_add pending -1 
 197      else 
 198        sleep sleep_interval 
 199    while running>0 
 200      sleep sleep_interval 
 201   
 202   
 203  function file_sync s source area src_path dest_path mode0 filter sign_limit r status 
 204    arg_rw Stream s ; arg Str source ; arg_rw Data:ClusterArea area ; arg Str src_path dest_path ; arg Int mode0 ; arg Function filter ; arg Int sign_limit ; arg_rw ClusterReport r ; arg_rw ExtendedStatus status 
 205    var Int mode := mode0 
 206    var Link:Function time_filter :> cluster_filter area "time" (var Str err) 
 207    var CBool clear := area clear 
 208    var ClusterQueue query_queue answer_queue 
 209    var Pointer:ClusterPoint extra :> new_point 
 210    extra src := src_path 
 211    extra dest := dest_path 
 212    extra sign := file_sign dest_path filter sign_limit extra:count extra:size 
 213    extra action := shunt extra:sign<>"" 0 1 
 214    query_queue push extra 
 215    var Int pending := 1 
 216    var Int running := 1 
 217    thread # send requests 
 218      share s pending running 
 219      share query_queue answer_queue 
 220      while pending>0 
 221        var Pointer:ClusterPoint p :> query_queue pop_last 
 222        if exists:p 
 223          if verbose_file 
 224            console (shunt p:action=0 "fsign " p:action=1 "flist " "fread ")+(string p:src)+(shunt p:action=2 and clear " clear" "") eol 
 225          s writeline (shunt p:action=0 "fsign " p:action=1 "flist " "fread ")+(string p:src)+(shunt p:action=2 and clear " clear" "") 
 226          answer_queue push p 
 227        else 
 228          s flush anytime 
 229          sleep sleep_interval 
 230      atomic_add running -1 
 231    while pending>0 # read answers 
 232      time_filter_prototype (var CBool start) (var CBool run) time_filter 
 233      if not run and mode>1 
 234        mode := 1 
 235      var Pointer:ClusterPoint p :> answer_queue pop_first 
 236      if exists:p 
 237        var CBool drop := true 
 238        if p:action=0 
 239          var Str l := s readline 
 240          if (l parse (var Str sign)) and sign=p:sign 
 241            r file_total_count += p count ; r file_total_size += p size 
 242          else 
 243            p action := 1 
 244            query_queue push p ; drop := false 
 245        eif p:action=1 
 246          var (Dictionary Str Int) src_list := var (Dictionary Str Int) empty_dict 
 247          var (Dictionary Str Int) dest_list := var (Dictionary Str Int) empty_dict 
 248          var Array:FileInfo files := file_list p:dest extended+relative+directories+deadlinks 
 249          for (var Int i) 0 files:size-1 
 250            dest_list insert files:i:name i 
 251          while (s:readline parse (var Str name) (var Intn size) (var DateTime timestamp) any:(var Str options)) 
 252            if (file_filter_prototype p:dest+name source computer_fullname mode filter)>0 
 253              var Pointer:Int ptr_dest :> dest_list first name 
 254              var Pointer:FileInfo info 
 255              if exists:ptr_dest 
 256                info :> files ptr_dest 
 257              else 
 258                info :> null map FileInfo 
 259              if (options option "link") 
 260                r file_total_count += 1 
 261                if not exists:info 
 262                  r file_append_count += 1 
 263                  if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=2 
 264                    file_link "file:"+(options option "link" Str) p:dest+name 
 265                eif info:link<>(options option "link" Str) 
 266                  console info:link " -> " (options option "link" Str) eol 
 267                  r file_modified_count += 1 
 268                  if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=3 
 269                    file_delete p:dest+name 
 270                    file_link "file:"+(options option "link" Str) p:dest+name 
 271              eif name:len>0 and (name name:len-1)="/" 
 272                if exists:info and (info:is_link or not info:is_directory) 
 273                  if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=3 
 274                    file_delete p:dest+name 
 275                var Pointer:ClusterPoint extra :> new_point 
 276                extra src := p:src+name 
 277                extra dest := p:dest+name 
 278                extra sign := file_sign extra:dest filter sign_limit extra:count extra:size 
 279                extra action := shunt extra:sign<>"" 0 1 
 280                query_queue push extra 
 281                atomic_add pending 1 
 282              else # simple file 
 283                r file_total_count += 1 ; r file_total_size += size 
 284                if not exists:info 
 285                  r file_append_count += 1 ; r file_append_size += size 
 286                  if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=2 
 287                    var Pointer:ClusterPoint extra :> new_point 
 288                    extra src := p:src+name 
 289                    extra dest := p:dest+name 
 290                    extra sign := "datetime "+(string timestamp)+" "+options 
 291                    extra action := 2 
 292                    query_queue push extra 
 293                    atomic_add pending 1 
 294                eif info:size<>size or info:datetime<>timestamp 
 295                  r file_modified_count += 1 ; r file_modified_size += size 
 296                  if (file_filter_prototype p:dest+name source computer_fullname mode filter)>=3 
 297                    if info:is_link 
 298                      file_delete p:dest+name 
 299                    eif info:is_directory 
 300                      file_tree_delete p:dest+name 
 301                    var Pointer:ClusterPoint extra :> new_point 
 302                    extra src := p:src+name 
 303                    extra dest := p:dest+name 
 304                    extra sign := "datetime "+(string timestamp)+" "+options 
 305                    extra action := 2 
 306                    query_queue push extra 
 307                    atomic_add pending 1 
 308            src_list insert name 0 
 309          if s=success 
 310            for (var Int i) 0 files:size-1 
 311              if not exists:(src_list first files:i:name) 
 312                if (file_filter_prototype p:dest+files:i:name source computer_fullname mode filter)>0 
 313                  if files:i:is_link 
 314                    r file_deleted_count += 1 
 315                  eif files:i:is_directory 
 316                    var Array:FileInfo files2 := file_list p:dest+files:i:name extended+recursive 
 317                    for (var Int j) 0 files2:size-1 
 318                      if not files2:j:is_directory and not files2:j:is_link 
 319                        r file_deleted_count += 1 ; r file_deleted_size += files2:size 
 320                  else 
 321                    r file_deleted_count += 1 ; r file_deleted_size += files:i:size 
 322                  if (file_filter_prototype p:dest+files:i:name source computer_fullname mode filter)>=4 
 323                    if files:i:is_link 
 324                      file_delete p:dest+files:i:name 
 325                    eif files:i:is_directory 
 326                      file_tree_delete p:dest+files:i:name 
 327                    else 
 328                      file_delete p:dest+files:i:name 
 329        eif p:action=2 
 330          var Str l := s readline 
 331          if (l parse (var Intn remain)) 
 332            var Pointer:Stream s2 :> s 
 333            if clear 
 334              s2 :> s channel_support 
 335            var Str ext := ".tmp" 
 336            if p:dest:len>0 and (p:dest p:dest:len-1)="." 
 337              ext := "tmp" 
 338            if (file_query p:dest+ext standard)=defined 
 339              ext := "" 
 340            (var Stream file) open p:dest+ext out+safe+mkdir 
 341            part copy "receive "+p:dest 
 342              while remain>0 
 343                var Int step 
 344                if remain>=2^24 
 345                  step := 2^24 
 346                else 
 347                  step := remain 
 348                step := raw_copy s2 file step step 
 349                remain -= step 
 350                if step=0 
 351                  leave copy 
 352            var CBool write_ok := remain=0 
 353            var CBool close_ok := file:close=success 
 354            part fill 
 355              while remain<>0 
 356                var Int step 
 357                if remain>=2^24 
 358                  step := 2^24 
 359                else 
 360                  step := remain 
 361                s2 read_available (var Address adr) (var Int step) step 
 362                remain -= step 
 363                if step=0 
 364                  leave fill 
 365            var CBool read_ok := s:readline="ok" 
 366            if read_ok and write_ok and close_ok 
 367              file_configure p:dest+ext p:sign 
 368              if ext<>"" 
 369                file_delete p:dest 
 370                file_move p:dest+ext p:dest 
 371            else 
 372              file_delete p:dest+ext 
 373              status := failure "Failed to "+(shunt not read_ok "read" "write")+" file "+p:dest 
 374              if debug_data 
 375                console "Failed to "+(shunt ok "read" "write")+" file "+p:dest eol 
 376        if drop 
 377          drop_point p 
 378          atomic_add pending -1 
 379      else 
 380        sleep sleep_interval 
 381    while running>0 
 382      sleep sleep_interval 
 383   
 384   
 385  function cluster_sync areaid path mode0 -> status 
 386    arg Str areaid path mode0 ; arg ExtendedStatus status 
 387    # 'path' is related to the destination side 
 388    var Int mode := shunt mode0="count" 1 mode0="append" 2 mode0="modify" 3 mode0="delete" 4 0 
 389    var Data:ClusterArea area :> cluster_database:data:area areaid 
 390    if not exists:area 
 391      return (failure "There is no '"+areaid+"' clustering area.") 
 392    var Str err := "" 
 393    each m area:module 
 394      pliant_compiler_semaphore request 
 395      if not exists:(pliant_load_module m the_module:"/pliant/language/basic/safe.pli" 0 (null map Module)) 
 396        err := "Failed to compile module "+m 
 397      pliant_compiler_semaphore release 
 398    if err<>"" 
 399      area error := err 
 400      return failure:err 
 401    var Link:Function file_filter :> cluster_filter area "dest_file" (var Str err) 
 402    if err<>"" 
 403      area error := err 
 404      return failure:"Failed to compile file filtering formula" 
 405    var Link:Function data_filter :> cluster_filter area "dest_data" (var Str err) 
 406    if err<>"" 
 407      area error := err 
 408      return failure:"Failed to compile data filtering formula" 
 409    area start := datetime 
 410    area stop := undefined 
 411    area status := "R" 
 412    var ClusterReport r 
 413    status := success 
 414    each server area:src_computer 
 415      var Int port := (name_database:data:host keyof:server) remote_port 
 416      var Stream s 
 417      var ExtendedStatus os := s open "zchannel://"+keyof:server+"/site/"+(string (shunt port=defined port remote_tcp_port)+1)+"/"+computer_fullname in+out+noautopost+bigcache+safe 
 418      if os=success 
 419        if area:timeout=defined 
 420          s configure "timeout "+(string area:timeout) 
 421        s writeline "area "+string:areaid ; s flush anytime 
 422        var Str l := s readline 
 423        if l="ok" 
 424          each p area:data_path 
 425            if p:src<>"" and p:dest<>"" # syncing all databases at once is probably a very bad idea 
 426              if (p:dest 0 path:len)=path 
 427                data_sync s keyof:server area p:src p:dest mode data_filter area:data_sign_limit r status 
 428              eif (path 0 p:dest:len)=p:dest 
 429                data_sync s keyof:server area p:src+(path p:dest:len path:len) path mode data_filter area:data_sign_limit r status 
 430          each p area:file_path 
 431            if p:src<>"" and p:dest<>"" 
 432              if (p:dest 0 path:len)=path 
 433                file_sync s keyof:server area p:src p:dest mode file_filter area:file_sign_limit r status 
 434              eif (path 0 p:dest:len)=p:dest 
 435                file_sync s keyof:server area p:src+(path p:dest:len path:len) path mode file_filter area:file_sign_limit r status 
 436        else 
 437          status := failure "'"+keyof:server+"' "+(shunt l="error" "reported internal error" "has no '"+areaid+"' clustering area") 
 438      else 
 439        status := failure "Failed to connect to '"+keyof:server+"' ("+os:message+")" 
 440    area error := shunt status=success "" status:message 
 441    area file_total_count := r file_total_count ; area file_total_size := r file_total_size 
 442    area file_append_count := r file_append_count ; area file_append_size := r file_append_size 
 443    area file_modified_count := r file_modified_count ; area file_modified_size := r file_modified_size 
 444    area file_deleted_count := r file_deleted_count ; area file_deleted_size := r file_deleted_size 
 445    area data_total := r data_total 
 446    area data_append := r data_append 
 447    area data_modified := r data_modified 
 448    area data_deleted := r data_deleted 
 449    area stop := datetime 
 450    area status := " " 
 451    var Link:Function post :> cluster_filter area "post" (var Str err) 
 452    post_process_prototype area post 
 453    if verbose_report 
 454      console "data total " area:data_total " append " area:data_append " modified " area:data_modified " deleted " area:data_deleted eol 
 455      console "file total " area:file_total_count ":" area:file_total_size " append " area:file_append_count ":" area:file_append_size " modified " area:file_modified_count ":" area:file_modified_size " deleted " area:file_deleted_count ":" area:file_deleted_size eol 
 456      console (shunt status=success "done" "failed ("+status:message+")")+" in " (string area:stop:seconds-area:start:seconds "fixed 0") " seconds" eol 
 457   
 458   
 459  function cluster_daemon 
 460    daemon "clustering daemon" 
 461      daemon_sleep 120 
 462      while not daemon_emergency 
 463        each area cluster_database:data:area filter (exists area:dest_computer:computer_fullname) and area:mode<>"" 
 464          if area:status=" " and area:frequency=defined and datetime:seconds-area:stop:seconds>area:frequency 
 465            var Link:Function filter :> cluster_filter area "time" (var Str err) 
 466            time_filter_prototype (var CBool start) (var CBool run) filter 
 467            if start and run 
 468              area status := "R" 
 469              thread 
 470                part sync "synchronise area "+keyof:area 
 471                  cluster_sync keyof:area "" area:mode  
 472        daemon_sleep 15 
 473   
 474  export cluster_sync cluster_daemon