Patch title: Release 94 bulk changes
Abstract:
File: /pliant/language/schedule/threads_engine.pli
Key:
    Removed line
    Added line
# Copyright  Hubert Tonneau  hubert.tonneau@pliant.cx
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License version 2
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# version 2 along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.

# scope "/pliant/"
module "/pliant/install/ring2.pli"
module "pentium.pli"

constant has_user_field true


if os_api="linux"
  constant pliant_suspend_signal os_SIGUSR1
  public
    gvar Address stack_base
    gvar Int stack_size := 1024*2^10
  constant thread_trace false
  public
    constant stack_size 1024*2^10 # must match start.s
    gvar Int stack_base

  function compute_stack_base
    var Int a_variable_on_the_stack
    stack_base := cast (cast addressof:a_variable_on_the_stack uInt)\(cast stack_size uInt)*(cast stack_size uInt) Address
    memory_limit_address := stack_base translate Byte -stack_size
  compute_stack_base


type ThreadHeader
  field Address variables_context
  field Int language_index
  if has_user_field
    field Str user
  if os_api="linux"
    field Int pid
  eif os_api="os2"
    field Int tid
  eif os_api="posix"
    field Int id
  eif os_api="win32"
    field Int handle
  field Int priority
  field Int restart_cost
  field Pointer:ThreadHeader next
  field DelayedAction action
  field Address address # used by 'execute'
  field Pointer:ThreadHeader list_next list_previous
  field FastSem action_sem
  field Pointer:ActionRecord top_action
  field Pointer:ErrorRecord top_error
  field ErrorRecord bottom_error
  if processor_is_pentium
    field uInt processor_counter_low processor_counter_high

export ThreadHeader '. variables_context' '. address'
if os_api="linux"
  check ThreadHeader:size<=256 # must match start.s

export ThreadHeader '. variables_context' '. language_index' '. address'
export '. list_next' '. action_sem' '. top_action'
if has_user_field
  export '. user'
if os_api="linux"
  export '. pid'
eif os_api="os2"
  export '. tid'
if processor_is_pentium
  export '. processor_counter_low' '. processor_counter_high'


method h setup
  arg_rw ThreadHeader h
  h language_index := 0
  if has_user_field
    Str build_instance (addressof h:user)
  h top_action :> null map ActionRecord
  h:bottom_error id := error_id_noerror
  h:bottom_error filter := error_filter_none
  h:bottom_error next :> null map ErrorRecord
  h top_error :> h bottom_error


public
  gvar FastSem thread_list_sem
  gvar ThreadHeader thread_list_pivot
thread_list_pivot list_next :> thread_list_pivot ; thread_list_pivot list_previous :> thread_list_pivot

function thread_insert_header h
  arg_rw ThreadHeader h
  thread_list_sem request
  h list_next :> thread_list_pivot list_next
  h list_previous :> thread_list_pivot
  thread_list_pivot:list_next list_previous :> h
  thread_list_pivot list_next :> h
  thread_list_sem release

function thread_remove_header h
  arg_rw ThreadHeader h
  thread_list_sem request
  h:list_previous list_next :> h list_next
  h:list_next list_previous :> h list_previous
  thread_list_sem release


#---------------------------------------------------------------------
#  starting and stopping threads


if os_api="linux"

  if false

    function stop_current_thread
      os_kill current_thread_header:pid os_SIGSTOP

    function restart_thread h
      arg ThreadHeader h
      os_kill h:pid os_SIGCONT

  else

    function stop_current_thread
      os_sigsuspend 0 0 .not. (2^(pliant_suspend_signal-1) .or. 2^(os_SIGINT-1) .or. 2^(os_SIGTERM-1) .or. 2^(os_SIGKILL-1))
      os_sigsetmask 2^(pliant_suspend_signal-1)

    function restart_thread h
      arg ThreadHeader h
      os_kill h:pid pliant_suspend_signal

    function exception_handler3 num
      arg Int num
      external_calling_convention
    gvar os_sigaction sa3
    sa3 sa_handler := (the_function exception_handler3 Int) executable
    entry_root addressof:(the_function exception_handler3 Int)
    function record_exception_handler3 parameter filehandle
      arg Address parameter ; arg Int filehandle
      if (os_sigaction pliant_suspend_signal sa3 (null map os_sigaction))<>0
        error error_id_os "Failed to install Linux pliant_suspend_signal exception handler"
    record_exception_handler3 null 0
    gvar DelayedAction da3
    da3 function :> the_function record_exception_handler3 Address Int
    pliant_restore_actions append addressof:da3

eif os_api="posix"

  function pthread_self -> handle
    arg Int handle
    external os_libpthread_filename "pthread_self"

  function pthread_kill handle signal -> err
    arg Int handle signal err
    external os_libpthread_filename "pthread_kill"

  function stop_current_thread
    pthread_kill pthread_self os_SIGSTOP

  function restart_thread h
    arg ThreadHeader h
    pthread_kill h:id os_SIGCONT

eif os_api="win32"

  gvar Int tls

  function current_thread_header -> h
    arg_RW ThreadHeader h
    h :> os_TlsGetValue:tls map ThreadHeader

  function stop_current_thread
    var Pointer:ThreadHeader h :> current_thread_header
    while h:handle=(-1)
      os_yield
    os_SuspendThread h:handle

  function restart_thread h
    arg ThreadHeader h
    while (os_ResumeThread h:handle)<=0
      os_yield

eif os_api="os2"

  function stop_current_thread
    os_DosSuspendThread current_thread_header:tid

  function restart_thread h
    arg ThreadHeader h
    while (os_DosResumeThread h:tid)=ERROR_NOT_FROZEN
      os_yield


#---------------------------------------------------------------------
#  allocating thread stacks

if os_api="linux"

  public
    gvar Int thread_stacks_count := 1
  gvar FastSem threads_sem
  gvar Pointer:ThreadHeader first_zombie :> null map ThreadHeader
  gvar Pointer:ThreadHeader first_available :> null map ThreadHeader

  gvar Pointer:ThreadHeader recycling_first :> null map ThreadHeader
  gvar FastSem recycling_sem

  function record_zombie h
    arg_rw ThreadHeader h
    threads_sem request
    h next :> first_zombie ; first_zombie :> h
    threads_sem release

  function terminate_zombies
    if addressof:first_zombie=null
      return
    var Pointer:ThreadHeader lost :> null map ThreadHeader
    threads_sem request
    while addressof:first_zombie<>null
      var Pointer:ThreadHeader h :> first_zombie ; first_zombie :> h next
      threads_sem release
      var Int pid := os_waitpid h:pid (null map Int) 80000000h # waits for a cloned process
      threads_sem request
      if pid=h:pid
        h pid := 0
        h next :> first_available ; first_available :> h
      else
        h next :> lost ; lost :> h
    while addressof:lost<>null
      h :> lost ; lost :> h next
      h next :> first_zombie ; first_zombie :> h
    threads_sem release

  function allocate_stack -> h
    arg_RW ThreadHeader h
    threads_sem request
    if addressof:first_available<>null
      h :> first_available ; first_available :> h next
      threads_sem release
    else
      var Address bottom := stack_base translate Byte -thread_stacks_count*stack_size
      var Address bottom := cast stack_base.-.thread_stacks_count*stack_size Address
      if (cast bottom uInt)<(cast memory_base_address uInt)
        error_notify_fatal error_id_memory_starvation "Out of memory pages for thread stacks"
      memory_limit_address := bottom
      thread_stacks_count := thread_stacks_count+1
      threads_sem release
      var Address got := os_mmap bottom stack_size os_PROT_READ+os_PROT_WRITE os_MAP_PRIVATE+os_MAP_ANONYMOUS+os_MAP_FIXED+os_MAP_GROWSDOWN -1 0
      if got<>(cast -1 Address)
        h :> ((bottom translate Byte stack_size) translate ThreadHeader -1) map ThreadHeader
        h pid := 0
      else
        h :> null map ThreadHeader

  function free_stack h
    arg_rw ThreadHeader h
    threads_sem request
    h pid := 0
    h next :> first_available ; first_available :> h
    threads_sem release

  export record_zombie terminate_zombies allocate_stack free_stack

eif os_api="posix" or os_api="win32" or os_api="os2"

  gvar FastSem threads_sem
  gvar Pointer:ThreadHeader first_zombie :> null map ThreadHeader

  function record_zombie h
    arg_rw ThreadHeader h
    threads_sem request
    h next :> first_zombie ; first_zombie :> h
    threads_sem release

  function terminate_zombies
    if addressof:first_zombie=null
      return
    threads_sem request
    while addressof:first_zombie<>null
      var Pointer:ThreadHeader h :> first_zombie ; first_zombie :> h next
      if os_api="win32"
        threads_sem release
        while h:handle=(-1)
          os_yield
        os_CloseHandle h:handle
        threads_sem request
      entry_unlock addressof:h
    threads_sem release


#---------------------------------------------------------------------
#  running a new thread


gvar Int current_running_threads := 1
gvar Int maximum_running_threads := 1

function execute1 a f
  arg Address a ; arg Function f
  indirect

if os_api="linux"

  gvar ThreadHeader first_thread_header
  gvar Int first_thread_stack_top

 
  function current_thread_header -> h
    arg_RW ThreadHeader h
    has_side_effects
    var Int esp := i386_register i386_esp
    var Int stack_top := (esp .+. stack_size .-. 1) .and. .not. (stack_size-1)
    if stack_top=first_thread_stack_top
      h :> first_thread_header
    else
      h :> (((cast stack_top Address) translate ThreadHeader -1) map ThreadHeader)
    var Int stack_bottom := esp .and. .not. (cast stack_size-1 Int)
    h :> (cast stack_bottom .+. (stack_size-ThreadHeader:size) Address) map ThreadHeader

  function set_first_thread_info p fh
    arg Address p ; arg Int fh
    var Int esp := i386_register i386_esp
    first_thread_stack_top := (esp .+. stack_size .-. 1) .and. .not. (stack_size-1)
    first_thread_header pid := os_getpid
    stack_base := esp .and. .not. (cast stack_size-1 Int)
    var Pointer:ThreadHeader h :> current_thread_header
    h setup
    h pid := os_getpid
    #
    thread_stacks_count := 1
    first_zombie :> null map ThreadHeader
    first_available :> null map ThreadHeader
    recycling_first :> null map ThreadHeader
  set_first_thread_info null 0
  gvar DelayedAction da1
  da1 function :> the_function set_first_thread_info Address Int
  pliant_restore_actions insert_before pliant_restore_actions:first addressof:da1

  gcc_off

    function run_thread action -> success
      arg DelayedAction action ; arg CBool success
      if thread_trace
        console "run "+action:function:position+"[lf]"
      recycling_sem request
      if exists:recycling_first
        var Pointer:ThreadHeader h :> recycling_first
        recycling_first :> h next
        recycling_sem release
        var Pointer:ThreadHeader cth :> current_thread_header
        h variables_context := cth variables_context
        h language_index := cth language_index
        if has_user_field
          h user := cth user
        h action := action
        restart_thread h
        return true
      recycling_sem release
      terminate_zombies
      atomic_add current_running_threads 1
      maximum_running_threads := max maximum_running_threads current_running_threads
      var Pointer:ThreadHeader h :> allocate_stack
      if not exists:h
        atomic_add current_running_threads -1
        return false
      h variables_context := current_thread_header variables_context
      h setup
      var Pointer:ThreadHeader cth :> current_thread_header
      h variables_context := cth variables_context
      h language_index := cth language_index
      if has_user_field
        h user := cth user
      DelayedAction build_instance (addressof h:action)
      ErrorRecord build_instance (addressof h:bottom_error)
      h action := action
      h setup      
      var Int pid := os_clone 8F00h (addressof:h translate Byte -256)
      if pid=0
        h :> current_thread_header
        h pid := os_getpid
        os_sigsetmask 2^(pliant_suspend_signal-1)
        thread_insert_header h
        execute1 h:action:parameter h:action:function
        error_report
        while pliant_execution_phase<=execution_phase_run and (exists h:action:function)
          DelayedAction destroy_instance (addressof h:action)
          DelayedAction build_instance (addressof h:action)
          ActionRecord build_instance addressof:(var ActionRecord ar)
          action_push_record (var ActionRecord ar) "recycling"
          recycling_sem request
          h next :> recycling_first
          recycling_first :> h
          recycling_sem release
          stop_current_thread
          action_pull_record ar
          ActionRecord destroy_instance addressof:ar
          if (exists h:action:function)
            execute1 h:action:parameter h:action:function
          error_report
        thread_remove_header h
        ErrorRecord destroy_instance (addressof h:bottom_error)
        record_zombie h
        atomic_add current_running_threads -1
        os_exit 0
      eif pid=(-1)
        DelayedAction destroy_instance (addressof h:action)
        free_stack h
        atomic_add current_running_threads -1
        return false
      else
        return true

  function threads_shrink
    recycling_sem request
    while exists:recycling_first
      var Pointer:ThreadHeader h :> recycling_first
      recycling_first :> h next
      restart_thread h
    recycling_sem release
  
eif os_api="posix"

  function pthread_key_create handle destructor -> err
    arg_w Int handle ; arg Address destructor ; arg Int err
    external os_libpthread_filename "pthread_key_create"

  function pthread_setspecific handle value -> err
    arg Int handle ; arg Address value ; arg Int err
    external os_libpthread_filename "pthread_setspecific"

  function pthread_getspecific handle -> value
    arg Int handle ; arg Address value
    external os_libpthread_filename "pthread_getspecific"

  function pthread_create handle attr routine arg -> err
    arg_w Int handle ; arg Address attr routine arg ; arg Int err
    external os_libpthread_filename "pthread_create"

  gvar Int key
  gvar ThreadHeader first_thread_header

  function set_first_thread_info p fh
    arg Address p ; arg Int fh
    first_thread_header id := pthread_self
    pthread_key_create key null
    pthread_setspecific key addressof:first_thread_header
  set_first_thread_info null 0
  gvar DelayedAction da1
  da1 function :> the_function set_first_thread_info Address Int
  pliant_restore_actions insert_before pliant_restore_actions:first addressof:da1

  function current_thread_header -> h
    arg_RW ThreadHeader h
    h :> pthread_getspecific:key map ThreadHeader

  function thread_execute h
    arg_rw ThreadHeader h
    external_calling_convention
    pthread_setspecific key addressof:h
    h id := pthread_self
    thread_insert_header h
    execute1 h:action:parameter h:action:function
    thread_remove_header h
    error_report
    h:bottom_error context := null
    h:action parameter := null
    record_zombie h
    atomic_add current_running_threads -1

  gcc_off

    function run_thread action -> success
      arg DelayedAction action ; arg CBool success
      terminate_zombies
      atomic_add current_running_threads 1
      maximum_running_threads := max maximum_running_threads current_running_threads
      var Pointer:ThreadHeader h :> new ThreadHeader ; entry_lock addressof:h
      h variables_context := current_thread_header variables_context
      h action := action
      h setup
      var Pointer:ThreadHeader cth :> current_thread_header
      h variables_context := cth variables_context
      h language_index := cth language_index
      if has_user_field
        h user := cth user
      h action := action
      success := (pthread_create (var Int handle) null (the_function thread_execute ThreadHeader):executable addressof:h)=0
      if not success
        atomic_add current_running_threads -1
        entry_unlock addressof:h

  function threads_shrink
     void

eif os_api="win32"

  gvar ThreadHeader first_thread_header

  if pliant_debugging_level>=1
    module "/pliant/language/debug/report.pli"

  function set_first_thread_info p fh
    arg Address p ; arg Int fh
    os_DuplicateHandle os_GetCurrentProcess os_GetCurrentThread os_GetCurrentProcess first_thread_header:handle 0 false DUPLICATE_SAME_ACCESS
    if pliant_debugging_level>=1
      'first thread handle' := first_thread_header handle
    tls := os_TlsAlloc
    os_TlsSetValue tls addressof:first_thread_header
  set_first_thread_info null 0
  gvar DelayedAction da1
  da1 function :> the_function set_first_thread_info Address Int
  pliant_restore_actions insert_before pliant_restore_actions:first addressof:da1

  function thread_execute h
    arg_rw ThreadHeader h
    external_calling_convention
    os_TlsSetValue tls addressof:h
    thread_insert_header h
    execute1 h:action:parameter h:action:function
    thread_remove_header h
    error_report
    h:bottom_error context := null
    h:action parameter := null
    record_zombie h
    atomic_add current_running_threads -1

  function run_thread action -> success
    arg DelayedAction action ; arg CBool success
    terminate_zombies
    atomic_add current_running_threads 1
    maximum_running_threads := max maximum_running_threads current_running_threads
    var Pointer:ThreadHeader h :> new ThreadHeader ; entry_lock addressof:h
    h variables_context := current_thread_header variables_context
    h action := action
    h setup
    var Pointer:ThreadHeader cth :> current_thread_header
    h variables_context := cth variables_context
    h language_index := cth language_index
    if has_user_field
      h user := cth user
    h action := action
    h handle := -1
    h handle := os_CreateThread null 0 (the_function thread_execute ThreadHeader):executable addressof:h 0 (var Int tid)
    success := h:handle<>0
    if not success
      atomic_add current_running_threads -1
      entry_unlock addressof:h

  function threads_shrink
     void

eif os_api="os2"

  public
    gvar Int stack_size := 1024*2^10
  gvar ThreadHeader first_thread_header
  gvar Address thread_mem

  function set_first_thread_info p fh
    arg Address p ; arg Int fh
    os_DosGetInfoBlocks (var Pointer:os_TIB tib) (var Pointer:os_PIB pib)
    first_thread_header tid := tib:tib2:tid
    os_DosAllocThreadLocalMemory 1 thread_mem
    thread_mem map Address := addressof first_thread_header
  set_first_thread_info null 0
  gvar DelayedAction da1
  da1 function :> the_function set_first_thread_info Address Int
  pliant_restore_actions insert_before pliant_restore_actions:first addressof:da1

  function current_thread_header -> h
    arg_RW ThreadHeader h
    h :> thread_mem map Pointer:ThreadHeader

  if pliant_debugging_level>=1
    module "/pliant/language/debug/report.pli"

  function thread_execute h
    arg_rw ThreadHeader h
    external_calling_convention
    if pliant_debugging_level>=1
      var os_ExceptionHandler eh
      eh previous := null
      eh executable := (the_function 'os2 exception routine' os_ExceptionReport os_ExceptionHandler os_ExceptionContext Address -> Int) executable
      os_DosSetExceptionHandler eh
    os_DosGetInfoBlocks (var Pointer:os_TIB tib) (var Pointer:os_PIB pib)
    h tid := tib:tib2:tid
    thread_mem map Address := addressof h
    thread_insert_header h
    execute1 h:action:parameter h:action:function
    thread_remove_header h
    error_report
    h:bottom_error context := null
    h:action parameter := null
    record_zombie h
    atomic_add current_running_threads -1
    if pliant_debugging_level>=1
      os_DosUnsetExceptionHandler eh

  function run_thread action -> success
    arg DelayedAction action ; arg CBool success
    terminate_zombies
    atomic_add current_running_threads 1
    maximum_running_threads := max maximum_running_threads current_running_threads
    var Pointer:ThreadHeader h :> new ThreadHeader ; entry_lock addressof:h
    h variables_context := current_thread_header variables_context
    h action := action
    h setup
    var Pointer:ThreadHeader cth :> current_thread_header
    h variables_context := cth variables_context
    h language_index := cth language_index
    if has_user_field
      h user := cth user
    h action := action
    success := (os_DosCreateThread (var Int tid) (the_function thread_execute ThreadHeader):executable addressof:h 0 stack_size)=0
    if not success
      atomic_add current_running_threads -1
      entry_unlock addressof:h

  function threads_shrink
     void

if os_api="linux" or os_api="posix" or os_api="win32" or os_api="os2"
if os_api="posix" or os_api="win32" or os_api="os2"
  if addressof:current_thread_header<>addressof:first_thread_header
    error error_id_os "Threads interface seems to be buggy under "+os_api # +" "+'convert to string':(cast addressof:current_thread_header Int)+" "+'convert to string':(cast addressof:first_thread_header Int)


export run_thread threads_shrink current_thread_header
export current_running_threads maximum_running_threads


if os_api="linux"

  public
    gvar Address memory_semaphore_address

  if pliant_c_debugging_level>=2
    function nolock
      void
    entry_root addressof:(the_function nolock)

  function back_to_single_thread
    var Int me := os_getpid
    for (var Int i) 1 thread_stacks_count-1
      var Address bottom := stack_base translate Byte -i*stack_size
      var Pointer:ThreadHeader h :> ((bottom translate Byte stack_size) translate ThreadHeader -1) map ThreadHeader
      var Int pid := h pid
      if pid<>me and pid>0
        os_kill pid os_SIGKILL
    pid := first_thread_header pid
    if pid<>me and pid>0
      os_kill pid os_SIGKILL
    for (var Int i) thread_stacks_count-1 0 step -1
      var Int bottom := stack_base.-.i*stack_size
      var Pointer:ThreadHeader h :> (cast bottom .+. (stack_size-ThreadHeader:size) Address) map ThreadHeader
      if h:pid<>me and h:pid>0
        os_kill h:pid os_SIGKILL
    memory_semaphore_address map Int := 0
    if pliant_c_debugging_level>=2
      pliant_entry_lock_hook := the_function:nolock executable ; pliant_entry_unlock_hook := the_function:nolock executable

  export back_to_single_thread

  if pliant_debugging_level=0

    gvar CBool first_time := true

    function error_fatal_hook id message
      arg Int id ; arg Str message
      console message ; console "[lf]"
      if first_time
        first_time := false
        back_to_single_thread
      os_exit (min id (addressof:error_id_user map Int))

    entry_root addressof:(the_function error_fatal_hook Int Str)
    pliant_error_fatal_hook := (the_function error_fatal_hook Int Str) executable

    function exception_handler2 num mark1 mark2 mark3 mark4 edi esi ebp esp ebx edx ecx eax drop1 drop2 eip
      arg Int num mark1 mark2 mark3 mark4 edi esi ebp esp ebx edx ecx eax drop1 drop2 eip
      external_calling_convention
      if first_time
        first_time := false
        back_to_single_thread
        console "exception " ; console 'convert to string':num ; console "[lf]"
      os_exit 4

    gvar os_sigaction sa
    sa sa_handler := (the_function exception_handler2 Int Int Int Int Int Int Int Int Int Int Int Int Int Int Int Int) executable
    entry_root addressof:(the_function exception_handler2 Int Int Int Int Int Int Int Int Int Int Int Int Int Int Int Int)

    function record_exception_handler parameter filehandle
      arg Address parameter ; arg Int filehandle
      # catching bugs
      if (os_sigaction os_SIGSEGV sa (null map os_sigaction))<>0
        error error_id_os "Failed to install Linux SIGSEGV exception handler"
      if (os_sigaction os_SIGBUS sa (null map os_sigaction))<>0
        error error_id_os "Failed to install Linux SIGBUS exception handler"
      if (os_sigaction os_SIGFPE sa (null map os_sigaction))<>0
        error error_id_os "Failed to install Linux SIGFPE exception handler"
      # catching terminal close
      if (os_sigaction os_SIGHUP sa (null map os_sigaction))<>0
        error error_id_os "Failed to install Linux SIGHUP exception handler"
      # catching Ctrl+C
      if (os_sigaction os_SIGINT sa (null map os_sigaction))<>0
        error error_id_os "Failed to install Linux SIGINT exception handler"
      # catching kill
      if (os_sigaction os_SIGTERM sa (null map os_sigaction))<>0
        error error_id_os "Failed to install Linux SIGTERM exception handler"
    record_exception_handler null 0
    gvar DelayedAction da
    da function :> the_function record_exception_handler Address Int
    pliant_restore_actions append addressof:da


#---------------------------------------------------------------------
#  multithreaded errors handling


function mt_action_push_record ar action
  arg_w ActionRecord ar ; arg Str action
  if (addressof ar:next)<>(cast -1 Address)
    error_notify_fatal error_id_unexpected "Action record pushed twice"
  if pliant_verbose_level_variable>=2
    console "[cr]"+(action 0 78)+" "
  ar action := action
  var Pointer:ThreadHeader h :> current_thread_header
  h:action_sem request 
  ar next :> h top_action
  h top_action :> ar
  h:action_sem release

function mt_action_pull_record ar
  arg_rw ActionRecord ar
  if (addressof ar:next)<>(cast -1 Address)
    var Pointer:ThreadHeader h :> current_thread_header
    while true
      h:action_sem request 
      var Pointer:ActionRecord top :> h top_action
      if not exists:top
        error_notify_fatal error_id_unexpected "Wrong action record pulled ("+ar:action+")"
      h top_action :> top next
      h:action_sem release
      top next :> (cast -1 Address) map ActionRecord
      if addressof:top=addressof:ar
        return

function mt_action_top_record -> a
  arg_RW ActionRecord a
  a :> current_thread_header top_action


function mt_error_push_record e filter
  arg_w ErrorRecord e ; arg ErrorID filter
  if (addressof e:next)<>(cast -1 Address)
    error_notify_fatal error_id_unexpected "Error record pushed twice"
  e id := error_id_noerror
  e filter := filter
  var Pointer:ThreadHeader h :> current_thread_header
  e next :> h top_error
  h top_error :> e

function mt_error_pull_record e
  arg_rw ErrorRecord e
  var Pointer:ThreadHeader h :> current_thread_header
  if (addressof h:top_error)<>addressof:e
    error_notify_fatal error_id_corrupted "Wrong error record pulled"
  error_propagate e e:next
  h top_error :> e next
  error_report
  e next :> (cast -1 Address) map ErrorRecord

function mt_error_top_record -> e
  arg_RW ErrorRecord e
  e :> current_thread_header top_error

export mt_action_push_record mt_action_pull_record


entry_root addressof:(the_function mt_action_push_record ActionRecord Str)
entry_root addressof:(the_function mt_action_pull_record ActionRecord)
entry_root addressof:(the_function mt_action_top_record -> ActionRecord)
entry_root addressof:(the_function mt_error_push_record ErrorRecord ErrorID)
entry_root addressof:(the_function mt_error_pull_record ErrorRecord)
entry_root addressof:(the_function mt_error_top_record -> ErrorRecord)
function activate_mt_hooks
  var Pointer:ThreadHeader h :> current_thread_header
  h top_action :> action_top_record
  h top_error :> error_top_record
  pliant_action_push_record_hook := (the_function mt_action_push_record ActionRecord Str) executable
  pliant_action_pull_record_hook := (the_function mt_action_pull_record ActionRecord) executable
  pliant_action_top_record_hook := (the_function mt_action_top_record -> ActionRecord) executable
  pliant_error_push_record_hook := (the_function mt_error_push_record ErrorRecord ErrorID) executable
  pliant_error_pull_record_hook := (the_function mt_error_pull_record ErrorRecord) executable
  pliant_error_top_record_hook := (the_function mt_error_top_record -> ErrorRecord) executable
activate_mt_hooks

function restore_mt_hooks p fh
  arg Address p ; arg Int fh
  var Pointer:ThreadHeader h :> current_thread_header
  current_running_threads := 1
  thread_list_pivot list_next :> thread_list_pivot ; thread_list_pivot list_previous :> thread_list_pivot
  thread_insert_header h
gvar DelayedAction restore
restore function :> the_function restore_mt_hooks Address Int
pliant_restore_actions insert_after pliant_restore_actions:first addressof:restore

function do_nothing drop
  arg Address drop

function shutdown_threads p
  arg Address p
  if os_api="linux"
    while { recycling_sem request ; exists recycling_first }
      var Pointer:ThreadHeader h :> recycling_first
      recycling_first :> h next
      recycling_sem release
      h:action function :> the_function do_nothing Address
      restart_thread h
      os_yield
    recycling_sem release
  terminate_zombies
gvar DelayedAction shutdown
shutdown function :> the_function shutdown_threads Address
pliant_shutdown_actions append addressof:shutdown


#---------------------------------------------------------------------
#  handling waiting threads queues


type ThreadQueue
  field Pointer:ThreadHeader first

function build  q
  arg_w ThreadQueue q
  q first :> null map ThreadHeader

method q is_empty -> empty
  arg ThreadQueue q ; arg CBool empty
  empty := (addressof q:first)=null


method queue add_current_thread priority restart_cost
  arg_rw ThreadQueue queue ; arg Int priority ; arg Int restart_cost
  var (Pointer Pointer:ThreadHeader) q :>> queue first
  while addressof:q<>null and q:priority>=priority
    q :>> q next
  var Pointer:ThreadHeader h :> current_thread_header
  h priority := priority
  h restart_cost := restart_cost
  h next :> q ; q :> h

method queue restart_some_threads quantity
  arg_rw ThreadQueue queue ; arg Int quantity
  var Pointer:ThreadHeader h :> queue first
  if addressof:h<>null
    var Int q := h restart_cost
    var Pointer:ThreadHeader stop :> h next
    restart_thread h
    while addressof:stop<>null and q+stop:restart_cost<=quantity
      h :> stop
      q += h restart_cost
      stop :> h next
      restart_thread h
    queue first :> stop

if true
  method queue restart_some_threads quantity variable value
    arg_rw ThreadQueue queue ; arg Int quantity ; arg_rw uInt variable ; arg uInt value
    if true
      var Pointer:ThreadHeader start :> queue first
      if addressof:start<>null
        var Int q := start restart_cost
        var Pointer:ThreadHeader stop :> start next
        while addressof:stop<>null and q+stop:restart_cost<=quantity
          q += stop restart_cost
          stop :> stop next
        queue first :> stop
        variable := value
        while addressof:start<>addressof:stop
          var Pointer:ThreadHeader h :> start
          start :> h next
          restart_thread h
      else
        variable := value
    else
      queue restart_some_threads quantity
      variable := value


export stop_current_thread restart_thread
export ThreadQueue '. is_empty' '. add_current_thread' '. restart_some_threads'