Invoke-ScritptAsync V2

Added the option to take the scriptblock arguments as either a collection of argument lists or a collection of parameter hashes to be splatted to the script block.

The argument lists or hash tables can be passed to the function either as an argument ($InputObject) or from the pipeline. The -InputType parameter specifies what type (argumentlist or parameters) the input objects are.

The default is ‘ArgumentList’, which is used for relatively simple script block that use $args or will use an argument list passed to positional parameters. More complex scenarios can be handled using the ‘Parameters’ input type, and passing hash tables of named parameters and values to the script block



function Invoke-ScriptAsync {

  [cmdletbinding()]

  param(
          #Script block to execute
          [Parameter(Mandatory)]
          [ScriptBlock] $ScriptBlock,
 
          #Array of argument lists or parameter hashes for the script block
          [Parameter(ValueFromPipeline)]
          [Object[]]$InputObject,

          #Type of input objects (argument lists or parameter hashes)
          [Parameter()]
          [ValidateSet('ArgumentList','Parameters')]
          [string]$InputType = 'ArgumentList',
      
          #Maximum number of threads to run
          [Parameter()]
          [int]$MaxThreads = 10,
 
          #Thread timeout (in seconds)
          [Parameter()]
          [int]$Timeout = 15,
       
          #Update refresh interval (in seconds)
          [Parameter()]
          [int]$RefreshInterval = 1,

          #Hash table reference to use for storing result objects
          [Parameter()]
          [HashTable]$ResultHash
      )

 Begin{ 

  $MaxRunTime = New-TimeSpan -Seconds $Timeout
 
  $RunspacePool = [RunspaceFactory ]::CreateRunspacePool(1, $MaxThreads)
  $RunspacePool.Open()

  $JobScript =
 @"
    &{ 
      `$DebugPreference = 'Continue'
      Write-Debug "Start(Ticks) = `$((get-date).Ticks)"
    }
 
    & { $ScriptBlock } @args
 
    &{
        `$DebugPreference = 'Continue' 
        Write-Debug "End(Ticks) = `$((get-date).Ticks)"
     }
"@
 
  $SaveJobData = 
   {
     #Save Pipeline Streams and information to ResultHash

     $JobData = $ResultHash[$Job.Pipe.InstanceID]

     $JobData.Started   = $Job.Started
     $JobData.Ended     = $Job.Ended
     $JobData.HadErrors = $Job.Pipe.HadErrors
     $JobData.State     = $Job.State
     $JobData.Verbose   = $Job.Pipe.Streams.Verbose.Readall() | Out-String

     $JobData.Error     = $Job.Pipe.Streams.Error | foreach { $_.Exception.Message }
     $JobData.Warning   = $Job.Pipe.Streams.Warning.Readall() | Out-String

     $Debug = $Job.Pipe.Streams.Debug
     if ($Debug.count -gt 2)
     { $JobData.Debug   = $Debug[1..($Debug.count - 2)] | Out-String }

     $JobData.Duration  = '{0:f2} ms' -f ($Job.Ended - $Job.Started).totalmilliseconds

    if ($Job.State -ne 'Timeout')
      { $JobData.OutputCount = $Job.Pipe.EndInvoke($Job.Result).count }
                
   } #End Save Job data

  if ($PSBoundParameters.ContainsKey('InputObject') )
    { $ScriptArgs = [Collections.ArrayList]@($InputObject) }

   else {$ScriptArgs = [Collections.ArrayList]@() }

 } #End Begin block

 Process {

 if ($_) 
   { [void]$ScriptArgs.Add($_) }

 } #End Process Block

 End {

  $Sequence = 0 

  $Jobs = 
    Foreach ($ScriptArg in $ScriptArgs)
     { 
       $Sequence++

       #Using argument list
       if ( $InputType -eq 'ArgumentList' )
         { 
          $Job = [powershell]::Create().
                 AddScript($JobScript).
                 AddArgument($ScriptArg)
          $Job.RunspacePool = $RunspacePool
         }

       #Using parameter hash
       if ( $InputType -eq 'Parameters' )
         { 
           $Job = [powershell]::Create().
                  AddScript($JobScript).
                  AddParameters($ScriptArg)
           $Job.RunspacePool = $RunspacePool
         }

      [PSCustomObject]@{
         Pipe     = $Job
         Result   = $Job.BeginInvoke()
         Started  = $null
         Ended    = $null
         State    = $null
         Sequence = $Sequence
        }

     if ( $PSBoundParameters.ContainsKey('ResultHash') )
       { 
        $ResultHash[$Job.InstanceID] =        
          [PSCustomObject]@{
            Sequence    = $Sequence
            Args        = $ScriptArg | Out-String
            Started     = $null
            Ended       = $null
            Duration    = $null
            State       = $null
            HadErrors   = $null
            Verbose     = $null
            Warning     = $null
            Error       = $null
            Debug       = $null
            OutPutCount = $null
           }
        }
  }# End Job creation (foreach script argument)


  $Waiting = $Jobs 

  While ( $Waiting )
    {
     foreach ($Job in  $Waiting )
      {
        #New job started - record start time
        if (
             ($Job.started -eq $null) -and
             ($job.pipe.Streams.Debug[0].Message -match 'Start')
            )
             {
               $StartTicks = 
                $Job.pipe.Streams.Debug[0].Message -replace 'Start\(Ticks\) = (\d+)','$1'
               $Job.Started = [Datetime]::MinValue + [TimeSpan]::FromTicks($StartTicks)
             }
        
        #Job completed - record end time and job data
        if ($Job.Result.IsCompleted)
          {
            $EndTicks = 
             $Job.pipe.Streams.Debug[-1].Message -replace 'End\(Ticks\) = (\d+)','$1'
            $Job.Ended = [Datetime]::MinValue + [TimeSpan]::FromTicks($EndTicks)
            $Job.State = $Job.pipe.InvocationStateInfo.State
            if ( $PSBoundParameters.ContainsKey('ResultHash') )
              { .$SaveJobData }
            $Job.Pipe.EndInvoke($Job.Result)
            $Job.Pipe.Dispose()
          } 
        
        #Job running, exceeded max run time. Record job data and stop thread.
        if ( ($Job.Started) -and
             (-not ($Job.Result.IsCompleted) ) -and
             (get-date) -gt ($Job.Started + $MaxRunTime))
          {
            Write-Warning "Job sequence number $($job.sequence) timed out."
            $Job.Ended = (Get-Date)
            $Job.Pipe.Stop()
            $Job.State = 'Timeout'
            if ( $PSBoundParameters.ContainsKey('ResultHash') )
              { .$SaveJobData }
            $Job.Pipe.Dispose()
           }
        } # end foreach waiting job 
          
          #Done checking waiting jobs. Get new waiting list and write progress

          #Jobs left to finish or time out
          $Waiting = $Jobs | Where { $_.Ended -eq $null }

          $Progress = 
            @{
               Activity = 'Running Scripts'
               Status   = "Completed $($Jobs.count - $Waiting.count) of $($Jobs.count)"
               PercentComplete = ($Jobs.count - $Waiting.count)  / $Jobs.count * 100
             }

          Write-Progress @Progress

          Start-Sleep -Seconds $RefreshInterval

     } # End while jobs waiting
    
    #All jobs finished.  Clean up runspace pool

    $RunSpacePool.Close()
    $RunSpacePool.Dispose()

 } # End End block
 
} # End function
 
Advertisements

2 responses to “Invoke-ScritptAsync V2

  1. Hi Rob!
    I have investigated in this function.
    I like the Idea to use streams for the object transport, but every stream has its own purpose.
    1. You contaminate the stream debug stream with data that is not intended for the stream.
    2. You have to use prayer based parsing PowerShell is to use objects and to pick data not to parse it.
    3. If you provide a Host for the runspacepool initial session state the message is processed directly with the host, you cannot catch the stream as man in the middle. (see my example below)
    If you do not provide a host, you have to program all the host features by yourself and you have to forward the streams to the original host by yourself. That is annoying, error prone and uncomfortable 🙂

    Don’t reinvent the wheel. Use the synchronized hashtable to share data between the parent process and the runspaces.
    You have to tread the synchronized hashtable very careful, because on heavy use, it can come to race conditions, deadlocks and other threading bugs.

    I think you can use synchronized hashtable in a thread secure manner, if you pay attention to the following rules:
    – Read operations should not do harm to the synchronized hashtable, write operations are the most critical.
    So use write operations on the synchronized hashtable very, very rare.
    – Do write operations on the synchronized hashtable only very few points in the code to reduce race conditions on simultaneous write operations.
    – Use only simple .NET value types for Write operations. Types like String or Integer should always be thread save.
    (see on MSDN: Value Types and Reference Types http://msdn.microsoft.com/en-us/library/t63sy5hs.aspx )

    Begin excerpt from Dave Wyatt

    You can modify the PowerShell variables within a runspace without causing problems, but if you modify the .NET object that the variable referred to, it’s not thread-safe.

    $someVariable = ‘New Value’ # is okay.
    $someVariable.SomeProperty = ‘New Value’ # is not.

    End excerpt from Dave Wyatt

    Even you handle the time Calculation to complicate
    You can instantiate a DateTime Structure with an simple Int64.

    The following line:
    $Job.Started = [Datetime]::MinValue + [TimeSpan]::FromTicks($StartTicks)

    can be replace by this:
    $Job.Started = [Datetime([Int64]$StartTicks)

    Look at this 😉
    $Ticks = (Get-Date).Ticks
    [Datetime]$Ticks

    ### begin example CODE ###

    function Invoke-ScriptAsync {

    [cmdletbinding()]

    param(
    #Script block to execute
    [Parameter(Mandatory)]
    [ScriptBlock] $ScriptBlock,

    #Array of argument lists or parameter hashes for the script block
    [Parameter(ValueFromPipeline)]
    [Object[]]$InputObject,

    #Type of input objects (argument lists or parameter hashes)
    [Parameter()]
    [ValidateSet(‘ArgumentList’,’Parameters’)]
    [string]$InputType = ‘ArgumentList’,

    #Maximum number of threads to run
    [Parameter()]
    [int]$MaxThreads = 10,

    #Thread timeout (in seconds)
    [Parameter()]
    [int]$Timeout = 15,

    #Update refresh interval (in seconds)
    [Parameter()]
    [int]$RefreshInterval = 1,

    #Hash table reference to use for storing result objects
    [Parameter()]
    [HashTable]$ResultHash
    )

    Begin{

    $MaxRunTime = New-TimeSpan -Seconds $Timeout

    $sessionState = [system.management.automation.runspaces.initialsessionstate]::CreateDefault()

    # add synchronized Hashtable to the initialsession state, so all threads can use the same synchronized Hashtable
    $sessionState.Variables.Add((New-Object -TypeName System.Management.Automation.Runspaces.SessionStateVariableEntry -ArgumentList ‘SynchronizedHash’, $SynchronizedHash,”))

    $RunspacePool = [RunspaceFactory ]::CreateRunspacePool(1, $MaxThreads, $sessionState, $host)
    $RunspacePool.Open()

    $JobScript =
    @”
    @args

    “@

    $SaveJobData =
    {
    #Save Pipeline Streams and information to ResultHash

    $JobData = $ResultHash[$Job.Pipe.InstanceID]

    $JobData.Started = $Job.Started
    $JobData.Ended = $Job.Ended
    $JobData.HadErrors = $Job.Pipe.HadErrors
    $JobData.State = $Job.State
    $JobData.Verbose = $Job.Pipe.Streams.Verbose.Readall() | Out-String

    $JobData.Error = $Job.Pipe.Streams.Error | foreach { $_.Exception.Message }
    $JobData.Warning = $Job.Pipe.Streams.Warning.Readall() | Out-String

    $Debug = $Job.Pipe.Streams.Debug
    if ($Debug.count -gt 2)
    { $JobData.Debug = $Debug[1..($Debug.count – 2)] | Out-String }

    $JobData.Duration = ‘{0:f2} ms’ -f ($Job.Ended – $Job.Started).totalmilliseconds

    if ($Job.State -ne ‘Timeout’)
    { $JobData.OutputCount = $Job.Pipe.EndInvoke($Job.Result).count }

    } #End Save Job data

    if ($PSBoundParameters.ContainsKey(‘InputObject’) )
    { $ScriptArgs = [Collections.ArrayList]@($InputObject) }

    else {$ScriptArgs = [Collections.ArrayList]@() }

    } #End Begin block

    Process {

    if ($_)
    { [void]$ScriptArgs.Add($_) }

    } #End Process Block

    End {

    $Sequence = 0

    $Jobs =
    Foreach ($ScriptArg in $ScriptArgs)
    {
    $Sequence++

    #Using argument list
    if ( $InputType -eq ‘ArgumentList’ )
    {
    $Job = [powershell]::Create().
    AddScript($JobScript).
    AddArgument($ScriptArg)
    $Job.RunspacePool = $RunspacePool
    }

    #Using parameter hash
    if ( $InputType -eq ‘Parameters’ )
    {
    $Job = [powershell]::Create().
    AddScript($JobScript).
    AddParameters($ScriptArg)
    $Job.RunspacePool = $RunspacePool
    }

    [PSCustomObject]@{
    Pipe = $Job
    Result = $Job.BeginInvoke()
    Started = $null
    Ended = $null
    State = $null
    Sequence = $Sequence
    }

    if ( $PSBoundParameters.ContainsKey(‘ResultHash’) )
    {
    $ResultHash[$Job.InstanceID] =
    [PSCustomObject]@{
    Sequence = $Sequence
    Args = $ScriptArg | Out-String
    Started = $null
    Ended = $null
    Duration = $null
    State = $null
    HadErrors = $null
    Verbose = $null
    Warning = $null
    Error = $null
    Debug = $null
    OutPutCount = $null
    }
    }
    }# End Job creation (foreach script argument)

    $Waiting = $Jobs

    While ( $Waiting )
    {
    foreach ($Job in $Waiting )
    {
    #New job started – record start time
    if (
    ($Job.started -eq $null) -and
    ($job.pipe.Streams.Debug[0].Message -match ‘Start’)
    )
    {
    $StartTicks =
    $Job.pipe.Streams.Debug[0].Message -replace ‘Start\(Ticks\) = (\d+)’,’$1′
    $Job.Started = [Datetime]::MinValue + [TimeSpan]::FromTicks($StartTicks)
    }

    #Job completed – record end time and job data
    if ($Job.Result.IsCompleted)
    {
    $EndTicks =
    $Job.pipe.Streams.Debug[-1].Message -replace ‘End\(Ticks\) = (\d+)’,’$1′
    $Job.Ended = [Datetime]::MinValue + [TimeSpan]::FromTicks($EndTicks)
    $Job.State = $Job.pipe.InvocationStateInfo.State
    if ( $PSBoundParameters.ContainsKey(‘ResultHash’) )
    { .$SaveJobData }
    $Job.Pipe.EndInvoke($Job.Result)
    $Job.Pipe.Dispose()
    }

    #Job running, exceeded max run time. Record job data and stop thread.
    if ( ($Job.Started) -and
    (-not ($Job.Result.IsCompleted) ) -and
    (get-date) -gt ($Job.Started + $MaxRunTime))
    {
    Write-Warning “Job sequence number $($job.sequence) timed out.”
    $Job.Ended = (Get-Date)
    $Job.Pipe.Stop()
    $Job.State = ‘Timeout’
    if ( $PSBoundParameters.ContainsKey(‘ResultHash’) )
    { .$SaveJobData }
    $Job.Pipe.Dispose()
    }
    } # end foreach waiting job

    #Done checking waiting jobs. Get new waiting list and write progress

    #Jobs left to finish or time out
    $Waiting = $Jobs | Where { $_.Ended -eq $null }

    $Progress =
    @{
    Activity = ‘Running Scripts’
    Status = “Completed $($Jobs.count – $Waiting.count) of $($Jobs.count)”
    PercentComplete = ($Jobs.count – $Waiting.count) / $Jobs.count * 100
    }

    Write-Progress @Progress

    Start-Sleep -Seconds $RefreshInterval

    } # End while jobs waiting

    #All jobs finished. Clean up runspace pool

    $RunSpacePool.Close()
    $RunSpacePool.Dispose()

    } # End End block

    } # End function

    Invoke-ScriptAsync -ScriptBlock {“Starting $($Args[0])”; Start-Sleep $args[0]; “Ending $($args[0])”} -InputObject (9..12) -Timeout 10

    #### End Example CODE ####

    I like some ideas you presented here.

    I will come back to you with another approach in the next comment… stay tuned 😉

    by the way. I think it is better to dicuss such questions in a Forum not in the comments of a blog.

    Greets Peter Kriegel
    Founder member of the German speaking PowerShell Community
    http://www.PowerShell-Group.eu

  2. Thank for the suggestions, and I look forward to seeing the your proposal. I wanted to avoid the synchronized hash table because it does impose a performance penalty. Depending on the circumstances it can be considerable. It should be less significant in scenarios where you have relatively fewer, longer running threads. If you’re using it to multi-thread a lot of relatively trivial tasks in a larger runspace pool, where jobs will cycle through the pool relatively quickly, just getting the start time recorded can become a substantial part of the run time for each job.

    If having a host is important, then this may not be a good solution. The target scenario for this solution was for tasks that will ultimately be put into production as scheduled jobs or tasks, where live host output isn’t particularly useful.

    While this does inject data into the stream that did not come from the argument script, it also removes it from the returned job results. If the script does write debug messages the injected messages will not be included in the returned job results.

    The point on writing an object to the debug stream is probably valid, though I disagree with characterization of “prayer based parsing”. If the function cannot be trusted to reliably read a timestamp from a script block the function itself created, the entire enterprise is questionable.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s