/pliant/util/remote/client.pli
 
 1  module "/pliant/language/compiler.pli" 
 2  module "/pliant/language/parser.pli" 
 3  module "/pliant/language/context.pli" 
 4  module "/pliant/language/stream.pli" 
 5  module "/pliant/util/crypto/channel.pli" 
 6  module "/pliant/language/data/string_cast.pli" 
 7  module "/pliant/protocol/dns/name.pli" 
 8  module "common.pli" 
 9   
 10  module "/pliant/protocol/dns/client.pli" # force using Pliant names database 
 11   
 12  (gvar TraceSlot remote_trace) configure "remote execution client" 
 13   
 14   
 15  function execute_within_context a f 
 16    arg Address a ; arg Function f 
 17    indirect 
 18   
 19  function remote_execute da fun server 
 20    arg_rw DelayedAction da ; arg RemoteFunction fun ; arg Str server 
 21    (var TraceSession log) bind remote_trace 
 22    if server=computer_fullname 
 23      if not (exists fun:local_streaming) and not (exists fun:remote_streaming) 
 24        if fun:host_offset=defined 
 25          (da:parameter translate Byte fun:host_offset) map Str := computer_fullname 
 26        execute_within_context da:parameter fun:body 
 27        if (exists fun:success_code) 
 28          execute_within_context da:parameter fun:success_code 
 29        return 
 30    var Pointer:Str message 
 31    if fun:message_offset=defined 
 32      message :> (da:parameter translate Byte fun:message_offset) map Str 
 33    else 
 34      message :> var Str drop 
 35    message := "" 
 36    var Link:Stream :> new Stream 
 37    var Int port := name_database:data:host:server remote_port 
 38    open "zchannel://"+server+"/site/"+string:(shunt port=defined port remote_tcp_port)+"/"+computer_fullname in+out+safe 
 39    if s=failure 
 40      message := "failed to connect to "+server 
 41      log trace "failed to connect to " server 
 42    writeline "call "+fun:id 
 43    log trace "call "+fun:id+" on "+server 
 44    var Pointer:Int index :> fun:ro_list first 
 45    while exists:index 
 46      var Pointer:TypeField tf :> fun:type field index 
 47      var Str value := to_string (da:parameter translate Byte tf:offset) tf:type "" 
 48      writeline "set "+tf:name+" "+value 
 49      log trace "output " tf:name+" "+value 
 50      index :> fun:ro_list next index 
 51    writeline "" 
 52    while { var Str line := readline ; line<>"" } 
 53      if (line parse word:"set" _ any:(var Str variable) _ any:(var Str value)) 
 54        var Pointer:Int index :> fun:rw_dict first variable 
 55        if exists:index 
 56          var Pointer:TypeField tf :> fun:type field index 
 57          from_string (da:parameter translate Byte tf:offset) tf:type value "" 
 58          log trace "input " variable " " value 
 59        else 
 60          log trace "unexpected input " variable " " value 
 61      eif (line parse word:"failure" (var Str msg)) 
 62        message := msg 
 63        log trace msg 
 64      else 
 65        log trace "unsupported line: " line 
 66    if message="" and s=failure 
 67      message := "remote process crashed" 
 68    if message="" 
 69      if (exists fun:local_streaming) and fun:ls_offset=defined 
 70        (da:parameter translate Byte fun:ls_offset) map Address := addressof s 
 71        execute_within_context da:parameter fun:local_streaming 
 72      if (exists fun:success_code) 
 73        execute_within_context da:parameter fun:success_code 
 74    else 
 75      if (exists fun:failure_code) 
 76        execute_within_context da:parameter fun:failure_code 
 77   
 78   
 79   
 80  dual_keyword remote 2 14 local_streaming 2 2 
 81  dual_keyword remote 2 14 remote_streaming 2 2 
 82  dual_keyword remote 2 14 success 1 1 
 83  dual_keyword remote 2 14 failure 1 2 
 84   
 85  method da get_address -> a 
 86    arg DelayedAction da ; arg Address a 
 87    := da parameter 
 88   
 89  constant to_index (the_function '. to string' Universal Str -> Str):generic_index 
 90   
 91  named_expression control_prototype 
 92    var Str v := 'remote host' 
 93    if not c 
 94      'remote rejected' := true 
 95      return 
 96   
 97  meta control e 
 98    if e:size=2 
 99      compile_as (expression duplicate control_prototype substitute v e:0 substitute c e:1) 
 100   
 101  method type offset field_name -> offset 
 102    arg Type type ; arg Str field_name ; arg Int offset 
 103    for (var Int i) type:nb_fields-1 
 104      var Pointer:TypeField tf :> type field i 
 105      if tf:name=field_name 
 106        return tf:offset 
 107    offset := undefined 
 108   
 109  meta modify e 
 110    var Pointer:Arrow :> e:module first "pliant remote modify" 
 111    if r=null or entry_type:r<>Dictionary 
 112      return 
 113    var Pointer:Dictionary modify :> map Dictionary 
 114    if e:size<1 
 115      return 
 116    for (var Int i) e:size-1 
 117      if not e:i:is_pure_ident 
 118        return 
 119    for (var Int i) e:size-1 
 120      e:compile ? 
 121    for (var Int i) e:size-1 
 122      modify insert e:i:ident true addressof:void 
 123    if e:size=1 
 124      set_result e:0:result e:0:access 
 125    else 
 126      set_void_result 
 127   
 128  meta ignore e 
 129    var Pointer:Arrow :> e:module first "pliant remote ignore" 
 130    if r=null or entry_type:r<>Dictionary 
 131      return 
 132    var Pointer:Dictionary ignore :> map Dictionary 
 133    if e:size<1 
 134      return 
 135    for (var Int i) e:size-1 
 136      if not e:i:is_pure_ident 
 137        return 
 138    for (var Int i) e:size-1 
 139      e:compile ? 
 140    for (var Int i) e:size-1 
 141      ignore insert e:i:ident true addressof:void 
 142    if e:size=1 
 143      set_result e:0:result e:0:access 
 144    else 
 145      set_void_result 
 146   
 147  meta remote e 
 148    var Pointer:Arrow :> pliant_general_dictionary first "pliant function" 
 149    if c=null or entry_type:c<>Function 
 150      return 
 151    var Link:Function current_function :> map Function 
 152    if e:size<or not (e:cast Str) 
 153      return 
 154    var Str id 
 155    var Pointer:Expression body 
 156    var Int ls_index := undefined 
 157    var Int rs_index := undefined 
 158    var Int success_index := undefined 
 159    var Int failure_index := undefined 
 160    var Int failure2_index := undefined 
 161    var Int i 
 162    if e:size>=and e:1:ident="id" and (e:constant Str)<>null 
 163      id := (e:constant Str) map Str 
 164      body :> 3 
 165      := 4 
 166    else 
 167      id := e:external_module:name+" "+current_function:name 
 168      body :> 1 
 169      := 2 
 170    while i<e:size 
 171      if i+2<e:size and e:i:ident="local_streaming" and e:(i+1):ident<>"" and e:(i+2):ident="{}" 
 172        ls_index := i 
 173        += 3 
 174      eif i+2<e:size and e:i:ident="remote_streaming" and e:(i+1):ident<>"" and e:(i+2):ident="{}" 
 175        rs_index := i 
 176        += 3 
 177      eif i+1<e:size and e:i:ident="success" and e:(i+1):ident="{}" 
 178        success_index := i 
 179        += 2 
 180      eif i+1<e:size and e:i:ident="failure" and e:(i+1):ident="{}" and failure2_index=undefined 
 181        failure_index := i 
 182        += 2 
 183      eif i+2<e:size and e:i:ident="failure" and e:(i+1):ident<>"" and e:(i+2):ident="{}" and failure_index=undefined 
 184        failure2_index := i 
 185        += 3 
 186      else 
 187        console "i = " eol 
 188        return 
 189    suckup e:0 
 190    var Pointer:Module module :> module ; var Address mark := module mark 
 191    local_variable "remote host" Str 
 192    local_variable "remote rejected" CBool 
 193    var Link:List expressions :> new List 
 194    var Link:List byaddress :> new List 
 195    expressions append addressof:body 
 196    if ls_index=defined 
 197      local_variable e:(ls_index+1) Stream 
 198      expressions append addressof:(ls_index+2) 
 199    if rs_index=defined 
 200      local_variable e:(rs_index+1) Stream 
 201      expressions append addressof:(rs_index+2) 
 202    if success_index=defined 
 203      expressions append addressof:(success_index+1) 
 204    if failure_index=defined 
 205      expressions append addressof:(failure_index+1) 
 206    if failure2_index=defined 
 207      local_variable e:(failure2_index+1) Str 
 208      expressions append addressof:(failure2_index+2) 
 209    var List functions ; var Link:Type type 
 210    module define "control" addressof:(the_meta control) 
 211    module define "modify" addressof:(the_meta modify) 
 212    module define "ignore" addressof:(the_meta ignore) 
 213    module define "pliant shared" addressof:byaddress 
 214    module define "share" addressof:(the_meta 'pliant share arguments') 
 215    var Link:Dictionary modify :> new Dictionary 
 216    module define "pliant remote modify" addressof:modify 
 217    var Link:Dictionary ignore :> new Dictionary 
 218    module define "pliant remote ignore" addressof:ignore 
 219    freeze expressions byaddress functions type 
 220    module rewind mark 
 221    void ? 
 222    var Pointer:Arrow ba :> byaddress first 
 223    while ba<>null 
 224      ignore insert (cast (ba map Ident) Str) true addressof:void 
 225      ba :> byaddress next ba 
 226    var Link:RemoteFunction fun :> new RemoteFunction 
 227    fun body :> functions:first map Function 
 228    fun type :> type 
 229    fun id := id 
 230    fun host_offset := type offset "remote host" 
 231    fun rejected_offset := type offset "remote rejected" 
 232    for (var Int i) type:nb_fields-1 
 233      var Pointer:TypeField tf :> type field i 
 234      if (tf:name search " " -1)=(-1) and (ignore first tf:name)=null 
 235        if ls_index=undefined or tf:name<>e:(ls_index+1):ident 
 236          if rs_index=undefined or tf:name<>e:(rs_index+1):ident 
 237            if failure2_index=undefined or tf:name<>e:(failure2_index+1):ident 
 238              var Pointer:Function conv :> tf:type get_generic_method to_index 
 239              if not exists:conv or conv=(the_function '. to string' Universal Str -> Str) 
 240                error error_id_compile "Cannot convert '"+tf:name+"' to a string that could be passed to the remote computer." 
 241                return 
 242              fun:ro_dict insert tf:name i 
 243              fun ro_list += i 
 244              if (modify first tf:name)<>null 
 245                fun:rw_dict insert tf:name i 
 246                fun rw_list += i 
 247    var Pointer:Arrow :> functions first 
 248    if ls_index=defined 
 249      :> functions next c 
 250      fun local_streaming :> map Function 
 251      fun ls_offset := type offset e:(ls_index+1):ident 
 252    if rs_index=defined 
 253      :> functions next c 
 254      fun remote_streaming :> map Function 
 255      fun rs_offset := type offset e:(rs_index+1):ident 
 256    if success_index=defined 
 257      :> functions next c 
 258      fun success_code :> map Function 
 259    if failure_index=defined 
 260      :> functions next c 
 261      fun failure_code :> map Function 
 262    if failure2_index=defined 
 263      :> functions next c 
 264      fun failure_code :> map Function 
 265      fun message_offset := type offset e:(failure2_index+1):ident 
 266    add (instruction (the_function remote_execute DelayedAction RemoteFunction Str) body:result (argument mapped_constant RemoteFunction fun) e:0:result) 
 267    var Link:Argument adr :> argument local Address 
 268    add (instruction (the_function '. get_address' DelayedAction -> Address) body:result adr) 
 269    var Pointer:Int index :> fun:ro_list first 
 270    while exists:index 
 271      var Pointer:TypeField tf :> fun:type field index 
 272      var Pointer:Arrow :> pliant_general_dictionary first tf:name 
 273      if c<>null and entry_type:c=LocalVariable 
 274        var Link:LocalVariable :> map LocalVariable 
 275        var Link:Argument field :> argument indirect Address adr tf:offset 
 276        add (instruction (the_function 'copy Universal' Universal Universal Type) field l:body (argument mapped_constant Type tf:type)) 
 277      index :> fun:ro_list next index 
 278    remote_sem request 
 279    remote_functions remove id null 
 280    remote_functions insert id true addressof:fun 
 281    remote_sem release 
 282    set_void_result 
 283   
 284  export remote 
 285   
 286   
 287 
 
 288   
 289  module "/pliant/language/stream.pli" 
 290  module "/pliant/admin/file.pli" 
 291   
 292   
 293  method item is_one_of set -> c 
 294    arg Str item set ; arg CBool c 
 295    := (" "+set+" " search " "+item+" " -1)<>(-1) 
 296   
 297   
 298  method stream file_send filename options -> status 
 299    arg_rw Stream stream ; arg Str filename options ; arg Status status 
 300    var FileInfo info := file_query filename standard 
 301    if info=undefined 
 302      stream writeline "" 
 303      if (options option "recover") 
 304        stream writeline "failure" 
 305      return failure 
 306    stream writeline "datetime "+(string info:datetime) 
 307    stream writeline "size "+(string info:size) 
 308    stream writeline "" 
 309    var Intn remain := info size 
 310    (var Stream data) open filename in+safe 
 311    while remain>and { var Int step := raw_copy data stream 1 (cast (shunt remain<2^22 remain 2^22) Int) ; step>0 } 
 312      remain -= step 
 313    status := shunt remain=0 success failure 
 314    if (options option "recover") 
 315      if remain>0 
 316        var Address buf := memory_zallocate 4096 null 
 317        while remain>and stream=success 
 318          var Int step := cast (shunt remain<4096 remain 4096) Int 
 319          stream raw_write buf step 
 320          remain -= step 
 321        memory_free buf 
 322      stream writeline (shunt status=success "success" "failure") 
 323    else 
 324      if status=failure 
 325        stream error "Failed to send file "+filename 
 326    if stream=failure 
 327      status := failure 
 328   
 329  method stream file_send filename -> status 
 330    arg_rw Stream stream ; arg Str filename ; arg Status status 
 331    status := stream file_send filename "" 
 332   
 333   
 334  method stream file_tree_send path options -> status 
 335    arg_rw Stream stream ; arg Str path options ; arg Status status 
 336    status := success 
 337    var Array:FileInfo files := file_list path standard+recursive+relative 
 338    for (var Int i) files:size-1 
 339      stream writeline "file "+(string files:i:name) 
 340      if (stream file_send path+files:i:name options)=failure 
 341        status := failure 
 342    stream writeline "end of file tree" 
 343   
 344  method stream file_tree_send path -> status 
 345    arg_rw Stream stream ; arg Str path ; arg Status status 
 346    status := stream file_tree_send path "" 
 347   
 348   
 349  method stream file_receive filename options -> status 
 350    arg_rw Stream stream ; arg Str filename options ; arg Status status 
 351    var Intn remain := 0 ; var DateTime dt := undefined 
 352    while { var Str := stream readline ; l<>"" } 
 353      parse word:"size" remain 
 354      parse word:"datetime" dt 
 355    var Str temp := file_temporary filename options 
 356    (var Stream data) open temp out+safe 
 357    while remain>and { var Int step := raw_copy stream data 1 (cast (shunt remain<2^22 remain 2^22) Int) ; step>0 } 
 358      remain -= step 
 359    status := shunt remain=0 success failure 
 360    if data:close=failure 
 361      status := failure 
 362    if (options option "recover") 
 363      if stream:readline<>"success" 
 364        status := failure 
 365    if status=success 
 366      file_configure temp "datetime "+string:dt 
 367      file_move temp filename 
 368    else 
 369      file_delete temp 
 370   
 371  method stream file_receive filename -> status 
 372    arg_rw Stream stream ; arg Str filename ; arg Status status 
 373    status := stream file_receive filename "" 
 374   
 375   
 376  method stream file_tree_receive path options -> status 
 377    arg_rw Stream stream ; arg Str path options ; arg ExtendedStatus status 
 378    status := success 
 379    while { var Str := stream readline ; parse word:"file" (var Str filename) } 
 380      file_tree_create path+filename 
 381      if (stream file_receive path+filename options)=failure 
 382        status := failure "File '"+filename+"' is corrupted" 
 383    if l<>"end of file tree" and status=success 
 384      status := failure "File tree is corrupted" 
 385     
 386  method stream file_tree_receive path -> status 
 387    arg_rw Stream stream ; arg Str path ; arg ExtendedStatus status 
 388    status := stream file_tree_receive path "" 
 389   
 390   
 391  export '. is_one_of' 
 392  export '. file_send' '. file_tree_send' '. file_receive' '. file_tree_receive' 
 393