Invoke-ScriptAsynch

My foray into the subject of Powershell runspaces and runspace pools continues, and at this point I’ve gotten past the Proof of Concept stage and have something that kind of resembles a usable function.

There are a few other runspace handling functions out there, and I learned much from studying them, but I really needed to sit down and write one as part of the learning process, and this is what I ended up with.

I wanted to take advantage of runspace pools, but they have the drawback of not providing a way to know when a particular instance has begun executing. This makes it difficult to specify and impose a timeout on stalled instances, and they can eventually bottle up your runspace pool if you can’t keep them cleaned out.

I solved this by doing a minor hijack of the script’s Debug stream to get a timestamp. Before it’s attached to a Powershell object, the script is wrapped in code to set the $DebugPreference to ‘Continue’, and write a debug message that contains the current time. This is done in it’s own scope, so the preference setting won’t override whatever might be set in the script. This will be the first thing the script does when it starts and the script streams can be interrogated while the script is running, so I periodically cycle through the instance objects, looking for that timestamp in the Debug stream, and then use that to determine if the instance has been running too long.

Another feature is the ability to save a collection of PS objects that have some detailed information about the instance execution and the content of it’s Verbose, Warning, Error, and Debug streams that can be used for debugging or reporting after the script blocks have finished.

It’s relatively simplistic in terms of the arguments it accepts compared to some of the other functions out there. It takes two arguments – a script block, and an array of argument lists. For each argument list in the array, it creates a Powershell object using the script block and that argument list, and hands it over to the runspace pool.

The result objects that contain the run details and stream information are kept in a hash table in the current scope (so you still have them after the function exits). To save those objects, create an empty hash table, and the assign it to the ResultHash parameter.

Thanks to Boe Prox and Dave Wyatt for their help on this, and I look forward to their (and anyone else’s) feedback, suggestions, and constructive criticism.


function Invoke-ScriptAsync {
    [cmdletbinding()]
    param(
        [Parameter(Mandatory=$true,position=0)]
        [ScriptBlock] $ScriptBlock,
 
        [Parameter(Mandatory=$true)]
        [array]$ArgumentList,
 
        [Parameter()]
        [int]$MaxThreads = 10,
 
        [Parameter()]
        [int]$Timeout = 15,
 
        [Parameter()]
        [int]$RefreshInterval = 1,

        [Parameter()]
        [HashTable]$ResultHash
  )
 
$MaxRunTime = New-TimeSpan -Seconds $Timeout
 
$RunspacePool = [RunspaceFactory ]::CreateRunspacePool(1, $MaxThreads)
$RunspacePool.Open()

$JobScript =
 @"
    &{ 
      `$DebugPreference = 'Continue'
      Write-Debug "Start(Ticks) = `$((get-date).Ticks)"
    }
 
    & { $ScriptBlock } @args
 
    &{
        `$DebugPreference = 'Continue' 
        Write-Debug "End(Ticks) = `$((get-date).Ticks)"
     }
"@
 
$SaveJobData = 
 {
   #Save Pipeline Streams and information to ResultHash

   $JobData = $ResultHash[$Job.Pipe.InstanceID]
   $JobData.Started = $Job.Started
   $JobData.Ended = $Job.Ended
   $JobData.HadErrors = $Job.Pipe.HadErrors
   $JobData.State = $Job.State
   $JobData.Error = $Job.Pipe.Streams.Error |
      foreach { $_.Exception.Message }
   $JobData.Warning = $Job.Pipe.Streams.Warning.Readall() | Out-String
             
   $Debug = $Job.Pipe.Streams.Debug
   if ($Debug.count -gt 2)
     { $JobData.Debug = $Debug[1..($Debug.count - 2)] | Out-String }
 
   $JobData.Verbose = $Job.Pipe.Streams.Verbose.Readall() | Out-String
 
   $JobData.Duration = '{0:f2} ms' -f ($Job.Ended - $Job.Started).totalmilliseconds

  if ($Job.State -ne 'Timeout')
    { $JobData.OutputCount = $Job.Pipe.EndInvoke($Job.Result).count }
                
 } #End Save Job data

 
$Sequence = 1 
 
$Jobs = 
  Foreach ($Argument in $ArgumentList)
   { 
     $Job = [powershell]::Create().
                          AddScript($JobScript).
                          AddArgument($argument)
 
     $Job.RunspacePool = $RunspacePool
 
     New-Object PSObject -Property @{
               Pipe     = $Job
               Result   = $Job.BeginInvoke()
               Args     = $argument
               Started  = $null
               Ended    = $null
               State    = $null
              }

     if ( $PSBoundParameters.ContainsKey('ResultHash') )
       { 
         $ResultHash[$Job.InstanceID] =        
           New-Object PSObject -Property @{
               Sequence    = $Sequence++
               Args        = $argument
               Started     = $null
               Ended       = $null
               Duration    = $null
               State       = $null
               HadErrors   = $null
               Verbose     = $null
               Warning     = $null
               Error       = $null
               Debug       = $null
               OutPutCount = $null
              }
        }                                                                          
  }
 
$Waiting = $Jobs 

While ( $Waiting )
  {
    foreach ($Job in  $Waiting )
      {
        if (
             ($Job.started -eq $null) -and
             ($job.pipe.Streams.Debug[0].Message -match 'Start')
            )
             {
               $StartTicks = $Job.pipe.Streams.Debug[0].Message -replace 'Start\(Ticks\) = (\d+)','$1'
               $Job.Started = [Datetime]::MinValue + [TimeSpan]::FromTicks($StartTicks)
             }
 
        if ($Job.Result.IsCompleted)
          {
            $EndTicks = $Job.pipe.Streams.Debug[-1].Message -replace 'End\(Ticks\) = (\d+)','$1'
            $Job.Ended = [Datetime]::MinValue + [TimeSpan]::FromTicks($EndTicks)
            $Job.State = $Job.pipe.InvocationStateInfo.State
            if ( $PSBoundParameters.ContainsKey('ResultHash') )
              { .$SaveJobData }
            $Job.Pipe.EndInvoke($Job.Result)
            $Job.Pipe.Dispose()
          } 
 
        if ( ($Job.Started) -and
             (get-date) -gt ($Job.Started + $MaxRunTime))
          {
            Write-Warning "Job $($job.Pipe.InstanceId) using argument $($job.args) timed out."
            $Job.Ended = (Get-Date)
            $Job.Pipe.Stop()
            $Job.State = 'Timeout'
            if ( $PSBoundParameters.ContainsKey('ResultHash') )
              { .$SaveJobData }
            $Job.Pipe.Dispose()
           }
        }
 
          $Waiting = $Jobs | Where { $_.Ended -eq $null }

          $Progress = 
            @{
               Activity = 'Running Scripts'
               Status   = "Completed $($Jobs.count - $Waiting.count) of $($Jobs.count)"
               PercentComplete = ($Jobs.count - $Waiting.count)  / $Jobs.count * 100
             }

          Write-Progress @Progress

          Start-Sleep -Seconds $RefreshInterval
 }
    
 
  $RunSpacePool.Close()
  $RunSpacePool.Dispose()
 
  } # End function
 
 
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s