From Delphi XE7, there is a Parallel.For. There is also a OmniThreadLibrary (OTL) but it doesn’t compile on D2007 because there is no support for Generics and Anonymous methods in Delphi 2007 or before. The AsyncCall is supported from D2006 to XE2 but it does not have a Parallel.For syntax.
D2007 doesn’t support Anonymous methods and generics.. and implementing such Parallel.For is quite tricky. However, Parallel.For does make multithreading code less, and simple so it would be useful in the future in optimising some core parts of the algorithms. For example, simply replacing 4 for loops in a API make the unit test.
TestRasterizationStadium20151010_tmp9DE7_x86
from 1 min 25 secs to
to 1 min 17 secs.
The following introduces a simple implementation of ParallelFor under D2007 without the generics and anoymous methods. It is very hard or nearly impossible to achieve the same fluid of programming style with the Parallel.For (from XE7 or OTL), but with this implementation, it is still easy and efficient to benefit from multi-cores with the similar Parallel.For approach.
First, we would need to define the parameters passing to the parallel function:
type
TParallelParam = class
public
Data: Pointer;
First: integer;
Last: integer;
constructor Create; overload;
constructor Create(_Data: Pointer; _First, _Last: integer); overload;
procedure SetValues(_First, _Last: integer); overload;
procedure SetValues(_Data: Pointer; _First, _Last: integer); overload;
destructor Destroy; override;
end;
constructor TParallelParam.Create;
begin
inherited Create;
end;
constructor TParallelParam.Create(_Data: Pointer; _First, _Last: integer);
begin
inherited Create;
Self.SetValues(_Data, _First, _Last);
end;
procedure TParallelParam.SetValues(_First, _Last: Integer);
begin
Self.First := _First;
Self.Last := _Last;
end;
procedure TParallelParam.SetValues(_Data: Pointer; _First, _Last: Integer);
begin
Self.Data := _Data;
Self.First := _First;
Self.Last := _Last;
end;
destructor TParallelParam.Destroy;
begin
inherited;
end;
This TParallelParam has 3 fields that can be used by the parallel function to be executed by each thread. We define the Parallel Function prototype to be
type TParallelForFunc = procedure(Index: integer; const Param: TParallelParam);
The parameter index is different for each thread and the Param has all the data thread needs to access. The Data is a pointer type so you can store any parameters you like in a continuous memory locations that is accessed using the pointer.
We also need to wrap the Classes.TThread so allow passing these parameters:
type
TWorkerThread = class(TThread)
public
_Method: TParallelForFunc;
_Param: TParallelParam;
_Index: integer;
Done: boolean;
constructor Create(CreateSuspended: Boolean); overload;
destructor Destroy; override;
protected
procedure Execute; override;
end;
procedure TWorkerThread.Execute;
begin
try
Self._Method(_Index, Self._Param);
finally
Self.Terminate;
Self.Done := True;
end;
end;
constructor TWorkerThread.Create(CreateSuspended: Boolean);
begin
inherited Create(CreateSuspended);
Self.Done := False;
end;
destructor TWorkerThread.Destroy;
begin
inherited Destroy;
end;
Thus, with all above support, we can then implement this ParallelFor:
procedure TParallelFor(Method: TParallelForFunc; const Param: TParallelParam; First, Last: integer; ThreadNumber: integer);
var
flags: array of boolean;
threads: array of TWorkerThread;
tnum, len, i, cnt, j: integer;
begin
len := Last - First + 1;
if (len <= 0) then
begin
Raise Exception.Create('TParallelFor.Len = 0');
end;
SetLength(flags, len);
tnum := ThreadNumber;
if (tnum < 2) then
begin
tnum := 2;
end;
SetLength(threads, tnum);
i := 0;
cnt := 0; // job count finished
while (cnt < len) do // when not finished
begin
// find next available thread
for j := 0 to tnum - 1 do
begin
// thread not assigned or finished
if ((threads[j] = nil) or (threads[j].Done) or (threads[j].Terminated)) then
begin
// get next job
while (flags[i]) do
begin
i := (i + 1) mod len;
end;
// mark the job being done
flags[i] := True;
// increment the counter
Inc(cnt);
// start the job
if (threads[j] <> nil) then
begin
threads[j].Free;
threads[j] := nil;
end;
if (threads[j] = nil) then
begin
threads[j] := TWorkerThread.Create(True);
threads[j].FreeOnTerminate := False;
end;
with threads[j] do
begin
_Method := Method;
_Param := Param;
_Index := First + i;
Priority := tpHigher;
Resume; // run the tasks in parallel
end;
if (cnt >= len) then
begin
break;
end;
end;
end;
Sleep(0); // main thread busy spinning
end;
// wait for all threads
for i := 0 to tnum - 1 do
begin
if (threads[i] <> nil) then
begin
WaitForSingleObject(threads[i].Handle, INFINITE);
threads[i].Free;
end;
end;
end;
Example usages
Put all above code in a separate unit, e.g. Parallel;
uses
Parallel;
implementation
type
_PAAB = ^_AAB;
procedure Parallel1(Arg: integer; const obj: TParallelParam);
var
j, i, k: integer;
map: _PAAB;
begin
map := obj.Data;
i := obj.First;
j := obj.Last;
if (i <= j) then
begin
for k := i to j do
begin
if (map^[Arg][k] = 0) then
begin
map^[Arg][k] := 2;
end
else
begin
break;
end;
end;
end
else
begin
for k := i downto j do
begin
if (map^[Arg][k] = 0) then
begin
map^[Arg][k] := 2;
end
else
begin
break;
end;
end;
end;
end;
procedure TestParallel;
var
map: _AAB;
begin
SetLength(map, 1000, 1000);
obj := TParallelParam.Create;
try
obj.Data := @map;
obj.SetValues(0, High(map));
TParallelFor(@Parallel1, obj, 0, High(map[0]), 8); // 8 threads
finally
obj.Free;
end;
end;
end.
Limitations
I could have use AsyncCalls to simplified the code in the ParallelFor. However, since AsyncCall uses a Thread Pool to manage the threads. Threads are not terminated but kept in memory for needs until the unit is unloaded. It will have a thread deadlocks when the threads are freed in the finalization section of unit (com+ dll).
There is no Generics and Anonymous support at all under D2007 or before, so it would be impossible without compiler’s support. You may have to change the TParallelParam object to adapter your needs. For example, The step parameter could be added in the future.
Relevant Links
1. Delphi AsyncCalls Deadlock when Terminating the Threads (com+ dll finalization)
2. OTL can’t be compiled under D2007
3. Is it ok not to free objects before unit unloads from memory (finalization section)?
4. The Ultimate Delphi Threading Library
–EOF (The Ultimate Computing & Technology Blog) —
1154 wordsLast Post: Delphi TParallel Cleanup Needed
Next Post: Monitor the System Hardware Temperature (CPU) on Windows using SpeedFan [Freeware Download]