/pliant/language/schedule/queue.pli
 
 1  # Copyright  Hubert Tonneau  hubert.tonneau@pliant.cx 
 2  # 
 3  # This program is free software; you can redistribute it and/or 
 4  # modify it under the terms of the GNU General Public License version 2 
 5  # as published by the Free Software Foundation. 
 6  # 
 7  # This program is distributed in the hope that it will be useful, 
 8  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 9  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 10  # GNU General Public License for more details. 
 11  # 
 12  # You should have received a copy of the GNU General Public License 
 13  # version 2 along with this program; if not, write to the Free Software 
 14  # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA. 
 15   
 16  scope "/pliant/language/" "/pliant/install/" 
 17  module "/pliant/install/ring3.pli" 
 18   
 19   
 20  type QueueNode 
 21    field DelayedAction task 
 22    field Int index 
 23    field DelayedAction post 
 24    field Link:QueueNode next 
 25   
 26   
 27  type Queue 
 28    field FastSem sem 
 29    field Int active runnings 
 30    field Int count mini maxi 
 31    field Link:QueueNode first 
 32    field (Pointer Link:QueueNode) last 
 33    field ThreadQueue queue 
 34   
 35    field ThreadQueue pre_queue 
 36    field ThreadQueue pre_queue2 
 37   
 38    field FastSem post_sem 
 39    field Int post_current_index post_next_index 
 40    field CBool post_active post_running 
 41    field Int post_count 
 42    field Link:QueueNode post_first 
 43    field ThreadQueue post_queue 
 44   
 45    field CBool shy 
 46    field ErrorRecord error_record 
 47    field FastSem error_record_sem 
 48     
 49   
 50   
 51   
 52  function build  q 
 53    arg_w Queue q 
 54    active := 0 
 55    first :> null map QueueNode 
 56    last :>> first 
 57    post_first :> null map QueueNode 
 58   
 59   
 60  if pliant_debugging_level>=1 
 61    function destroy q 
 62      arg_w Queue q 
 63      check q:active="The queue is still active" 
 64      check (addressof q:first)=null "The queue is not empty" 
 65      check (addressof q:post_first)=null "The post queue is not empty" 
 66   
 67   
 68  function execute1 a f 
 69    arg Address a ; arg Function f 
 70    indirect 
 71   
 72  function queue_post_thread q 
 73    arg_rw Pointer:Queue q 
 74    var ErrorRecord e 
 75    error_push_record error_filter_all 
 76    while true 
 77      q:post_sem request 
 78      var (Pointer Link:QueueNode) n2 :>> post_first 
 79      while addressof:n2<>null and n2:index<>q:post_current_index 
 80        n2 :>> n2 next 
 81      if addressof:n2<>null 
 82        var Link:QueueNode :> n2 ; n2 :> next 
 83        post_current_index := q:post_current_index+1 
 84        post_count := q:post_count-1 
 85        if q:post_count<q:mini and not q:pre_queue2:is_empty 
 86          q:pre_queue2 restart_some_threads 1 
 87        q:post_sem release 
 88        execute1 n:post:parameter n:post:function 
 89        :> null map QueueNode 
 90      eif q:post_active 
 91        q:post_queue add_current_thread 0 1 
 92        q:post_sem release 
 93        stop_current_thread 
 94      else 
 95        post_running := false 
 96        q:post_queue restart_some_threads 1 
 97        q:post_sem release 
 98        error_pull_record e 
 99        return 
 100   
 101   
 102  function error_catch q 
 103    arg_rw Queue q 
 104    var Pointer:ErrorRecord :> error_top_record 
 105    if e:id<>error_id_noerror 
 106      q:error_record_sem request 
 107      error_propagate q:error_record 
 108      q:error_record_sem release 
 109      id := error_id_noerror 
 110   
 111   
 112  function execute q n 
 113    arg_rw Queue q ; arg_rw QueueNode n 
 114    if q:error_record:id<>error_id_noerror and q:shy 
 115      return 
 116    execute1 n:task:parameter n:task:function 
 117    error_catch q 
 118    n:task:parameter := null 
 119    if (addressof n:post:function)<>null 
 120      q:post_sem request 
 121      next :> post_first ; post_first :> n 
 122      post_count := q:post_count+1 
 123      if n:index=q:post_current_index 
 124        if not q:post_running 
 125          var DelayedAction da 
 126          da function :> the_function queue_post_thread Pointer:Queue 
 127          var (Link Pointer:Queue) ptr :>> new Pointer:Queue ; ptr :> q 
 128          da parameter := addressof Pointer:Queue ptr 
 129          post_running := true 
 130          if not run_thread:da 
 131            var CBool loop := true 
 132            while loop 
 133              var (Pointer Link:QueueNode) n2 :>> post_first 
 134              while addressof:n2<>null and n2:index<>q:post_current_index 
 135                n2 :>> n2 next 
 136              if addressof:n2<>null 
 137                var Link:QueueNode now :> n2 ; n2 :> now next 
 138                post_current_index := q:post_current_index+1 
 139                if q:post_count<q:mini and not q:pre_queue2:is_empty 
 140                  q:pre_queue2 restart_some_threads 1 
 141                q:post_sem release 
 142                execute1 now:post:parameter now:post:function 
 143                error_catch  
 144                now :> null map QueueNode 
 145                q:post_sem request 
 146              else 
 147                loop := false 
 148            post_running := false 
 149        eif not q:post_queue:is_empty 
 150          q:post_queue restart_some_threads 1 
 151      q:post_sem release 
 152   
 153   
 154  function queue_execute_thread q 
 155    arg_rw Pointer:Queue q 
 156    var ErrorRecord e 
 157    error_push_record error_filter_all 
 158    while true 
 159      q:sem request 
 160      if (addressof q:first)<>null 
 161        var Link:QueueNode :> first ; first :> next 
 162        if (addressof q:first)=null 
 163          last :>> first 
 164        count := q:count-1 
 165        if q:count<q:mini and not q:pre_queue:is_empty 
 166          q:pre_queue restart_some_threads 1 
 167        q:sem release 
 168        execute n 
 169      eif q:active<>0 
 170        q:queue add_current_thread 0 1 
 171        q:sem release 
 172        stop_current_thread 
 173      else 
 174        runnings := q:runnings-1 
 175        if q:runnings=0 
 176          q:queue restart_some_threads 1 
 177        q:sem release 
 178        error_pull_record e 
 179        return 
 180   
 181   
 182  method q start nthreads mini maxi active shy 
 183    arg_rw Queue q ; arg Int nthreads mini maxi ; arg CBool active shy 
 184    check q:active=0 error_id_mismatch "The queue is already active" 
 185    if nthreads=0 
 186      active := 1 
 187      return 
 188    active := shunt active 3 2 ; runnings := 0 
 189    count := 0 ; mini := mini ; maxi := maxi 
 190    shy := shy ; q:error_record id := error_id_noerror 
 191    var DelayedAction da 
 192    da function :> the_function queue_execute_thread Pointer:Queue 
 193    var (Link Pointer:Queue) ptr :>> new Pointer:Queue ; ptr :> q 
 194    da parameter := addressof Pointer:Queue ptr 
 195    for (var Int i) nthreads 
 196      if run_thread:da 
 197        runnings := q:runnings+1 
 198    if q:runnings=0 
 199      active := 1 
 200    post_current_index := 0 ; post_next_index := 0 
 201    post_active := true ; post_running := false 
 202    post_count := 0 
 203   
 204   
 205  method q stop 
 206    arg_rw Queue q 
 207    check q:active<>0 error_id_mismatch "The queue is not active" 
 208    if q:active=1 
 209      active := 0 
 210      return 
 211    q:sem request 
 212    if q:active=3 
 213      active := 0 
 214      while (addressof q:first)<>null 
 215        var Link:QueueNode node :> first 
 216        first :> node next 
 217        if (addressof q:first)=null 
 218          last :>> first 
 219        count := q:count-1 
 220        q:sem release 
 221        execute node 
 222        q:sem request 
 223    else 
 224      active := 0 
 225      while (addressof q:first)<>null 
 226        q:pre_queue add_current_thread 0 1 
 227        q:sem release 
 228        stop_current_thread 
 229        q:sem request 
 230    while not q:queue:is_empty 
 231      q:queue restart_some_threads 1 
 232    var CBool wait := q:runnings>0 
 233    if wait 
 234      q:queue add_current_thread 0 1 
 235    q:sem release 
 236    if wait 
 237      stop_current_thread 
 238      q:sem request 
 239      q:sem release 
 240    check (addressof q:first)=null "The queue should be empty" 
 241    check q:runnings="The queue threads should have stopped" 
 242    q:post_sem request 
 243    post_active := false 
 244    if not q:post_queue:is_empty 
 245      q:post_queue restart_some_threads 1 
 246    wait := q:post_running 
 247    if wait 
 248      q:post_queue add_current_thread 0 1 
 249    q:post_sem release 
 250    if wait 
 251      stop_current_thread 
 252      q:post_sem request 
 253      q:post_sem release 
 254    check (addressof q:post_first)=null "The post queue should be empty" 
 255    check not q:post_running "The post queue thread should have stopped" 
 256    error_propagate q:error_record error_top_record 
 257       
 258   
 259  method q append task post 
 260    arg_rw Queue q ; arg DelayedAction task ; arg DelayedAction post 
 261    check q:active<>"The queue is not active" 
 262    if q:error_record:id<>error_id_noerror and q:shy 
 263      return 
 264    if q:active=1 
 265      execute1 task:parameter task:function 
 266      if addressof:post<>null 
 267        execute1 post:parameter post:function 
 268      return 
 269    var Link:QueueNode :> new QueueNode 
 270    task := task 
 271    if addressof:post<>null 
 272      post := post 
 273    next :> null map QueueNode 
 274    q:sem request 
 275    if addressof:post<>null 
 276      index := post_next_index ; post_next_index := q:post_next_index+1 
 277    last :> n ; last :>> next 
 278    count := q:count+1 
 279    if not q:queue:is_empty 
 280      q:queue restart_some_threads 1 
 281    if q:count<=q:maxi 
 282      q:sem release 
 283    eif q:active=3 
 284      :> first ; first :> next 
 285      q:sem release 
 286      execute n 
 287    else 
 288      q:pre_queue add_current_thread 0 1 
 289      q:sem release 
 290      stop_current_thread 
 291    if q:post_count>q:maxi 
 292      q:post_sem request 
 293      if q:post_count<=q:maxi 
 294        q:post_sem release 
 295      else 
 296        q:pre_queue2 add_current_thread 0 1 
 297        q:post_sem release 
 298        stop_current_thread 
 299   
 300   
 301  export Queue '. start' '. stop' '. append'