Simple Parallel.For Implementation at Delphi 2007 without Generics and Anonymous Methods


From Delphi XE7, there is a Parallel.For. There is also a OmniThreadLibrary (OTL) but it doesn’t compile on D2007 because there is no support for Generics and Anonymous methods in Delphi 2007 or before. The AsyncCall is supported from D2006 to XE2 but it does not have a Parallel.For syntax.

D2007 doesn’t support Anonymous methods and generics.. and implementing such Parallel.For is quite tricky. However, Parallel.For does make multithreading code less, and simple so it would be useful in the future in optimising some core parts of the algorithms. For example, simply replacing 4 for loops in a API make the unit test.

TestRasterizationStadium20151010_tmp9DE7_x86
from 1 min 25 secs to
to 1 min 17 secs.

The following introduces a simple implementation of ParallelFor under D2007 without the generics and anoymous methods. It is very hard or nearly impossible to achieve the same fluid of programming style with the Parallel.For (from XE7 or OTL), but with this implementation, it is still easy and efficient to benefit from multi-cores with the similar Parallel.For approach.

First, we would need to define the parameters passing to the parallel function:

type
  TParallelParam = class
    public
      Data: Pointer;
      First: integer;
      Last: integer;

      constructor Create; overload;
      constructor Create(_Data: Pointer; _First, _Last: integer); overload;
      procedure SetValues(_First, _Last: integer); overload;
      procedure SetValues(_Data: Pointer; _First, _Last: integer); overload;
      destructor Destroy; override;
  end;

constructor TParallelParam.Create;
begin
  inherited Create;
end;

constructor TParallelParam.Create(_Data: Pointer; _First, _Last: integer);
begin
  inherited Create;
  Self.SetValues(_Data, _First, _Last);
end;

procedure TParallelParam.SetValues(_First, _Last: Integer);
begin
  Self.First := _First;
  Self.Last := _Last;
end;

procedure TParallelParam.SetValues(_Data: Pointer; _First, _Last: Integer);
begin
  Self.Data := _Data;
  Self.First := _First;
  Self.Last := _Last;
end;

destructor TParallelParam.Destroy;
begin
  inherited;
end;

This TParallelParam has 3 fields that can be used by the parallel function to be executed by each thread. We define the Parallel Function prototype to be

type
  TParallelForFunc = procedure(Index: integer; const Param: TParallelParam);

The parameter index is different for each thread and the Param has all the data thread needs to access. The Data is a pointer type so you can store any parameters you like in a continuous memory locations that is accessed using the pointer.

We also need to wrap the Classes.TThread so allow passing these parameters:

type
  TWorkerThread = class(TThread)
    public
      _Method: TParallelForFunc;
      _Param: TParallelParam;
      _Index: integer;
      Done: boolean;

      constructor Create(CreateSuspended: Boolean); overload;
      destructor Destroy; override;
    protected
      procedure Execute; override;
  end;

procedure TWorkerThread.Execute;
begin
  try
    Self._Method(_Index, Self._Param);
  finally
    Self.Terminate;
    Self.Done := True;
  end;
end;

constructor TWorkerThread.Create(CreateSuspended: Boolean);
begin
  inherited Create(CreateSuspended);
  Self.Done := False;
end;

destructor TWorkerThread.Destroy;
begin
  inherited Destroy;
end;

Thus, with all above support, we can then implement this ParallelFor:

procedure TParallelFor(Method: TParallelForFunc; const Param: TParallelParam; First, Last: integer; ThreadNumber: integer);
var
  flags: array of boolean;
  threads: array of TWorkerThread;
  tnum, len, i, cnt, j: integer;
begin
  len := Last - First + 1;
  if (len <= 0) then
  begin
    Raise Exception.Create('TParallelFor.Len = 0');
  end;
  SetLength(flags, len);
  tnum := ThreadNumber;
  if (tnum < 2) then
  begin
    tnum := 2;
  end;
  SetLength(threads, tnum);
  i := 0;
  cnt := 0; // job count finished
  while (cnt < len) do  // when not finished
  begin
    // find next available thread
    for j := 0 to tnum - 1 do
    begin
      // thread not assigned or finished
      if ((threads[j] = nil) or (threads[j].Done) or (threads[j].Terminated)) then
      begin
        // get next job
        while (flags[i]) do
        begin                              
          i := (i + 1) mod len;
        end;
        // mark the job being done
        flags[i] := True;
        // increment the counter
        Inc(cnt);
        // start the job
        if (threads[j] <> nil) then
        begin
          threads[j].Free;
          threads[j] := nil;
        end;
        if (threads[j] = nil) then
        begin
          threads[j] := TWorkerThread.Create(True);
          threads[j].FreeOnTerminate := False;
        end;
        with threads[j] do
        begin
          _Method := Method;
          _Param := Param;
          _Index := First + i;
          Priority := tpHigher;
          Resume; // run the tasks in parallel
        end;
        if (cnt >= len) then
        begin
          break;
        end;     
      end;
    end;
    Sleep(0); // main thread busy spinning 
  end;
  // wait for all threads
  for i := 0 to tnum - 1 do
  begin
    if (threads[i] <> nil) then
    begin
      WaitForSingleObject(threads[i].Handle, INFINITE);
      threads[i].Free;
    end;
  end;
end;

Example usages

Put all above code in a separate unit, e.g. Parallel;

uses
  Parallel;

implementation

type
  _PAAB = ^_AAB;

procedure Parallel1(Arg: integer; const obj: TParallelParam);
var
  j, i, k: integer;
  map: _PAAB;
begin
  map := obj.Data;
  i := obj.First;
  j := obj.Last;
  if (i <= j) then
  begin
    for k := i to j do
    begin
      if (map^[Arg][k] = 0) then
      begin
        map^[Arg][k] := 2;
      end
      else
      begin
        break;
      end;
    end;
  end
  else
  begin
    for k := i downto j do
    begin
      if (map^[Arg][k] = 0) then
      begin
        map^[Arg][k] := 2;
      end
      else
      begin
        break;
      end;
    end;
  end;
end;

procedure TestParallel;
var
  map: _AAB;
begin
  SetLength(map, 1000, 1000);
  obj := TParallelParam.Create;
  try
    obj.Data := @map;
    obj.SetValues(0, High(map));
    TParallelFor(@Parallel1, obj, 0, High(map[0]), 8); // 8 threads
  finally
    obj.Free;
  end;
end;

end.

Limitations

I could have use AsyncCalls to simplified the code in the ParallelFor. However, since AsyncCall uses a Thread Pool to manage the threads. Threads are not terminated but kept in memory for needs until the unit is unloaded. It will have a thread deadlocks when the threads are freed in the finalization section of unit (com+ dll).

There is no Generics and Anonymous support at all under D2007 or before, so it would be impossible without compiler’s support. You may have to change the TParallelParam object to adapter your needs. For example, The step parameter could be added in the future.

Relevant Links

1. Delphi AsyncCalls Deadlock when Terminating the Threads (com+ dll finalization)
2. OTL can’t be compiled under D2007
3. Is it ok not to free objects before unit unloads from memory (finalization section)?
4. The Ultimate Delphi Threading Library

–EOF (The Ultimate Computing & Technology Blog) —

GD Star Rating
loading...
1171 words
Last Post: Delphi TParallel Cleanup Needed
Next Post: Monitor the System Hardware Temperature (CPU) on Windows using SpeedFan [Freeware Download]

The Permanent URL is: Simple Parallel.For Implementation at Delphi 2007 without Generics and Anonymous Methods

Leave a Reply