From Delphi XE7, there is a Parallel.For. There is also a OmniThreadLibrary (OTL) but it doesn’t compile on D2007 because there is no support for Generics and Anonymous methods in Delphi 2007 or before. The AsyncCall is supported from D2006 to XE2 but it does not have a Parallel.For syntax.
D2007 doesn’t support Anonymous methods and generics.. and implementing such Parallel.For is quite tricky. However, Parallel.For does make multithreading code less, and simple so it would be useful in the future in optimising some core parts of the algorithms. For example, simply replacing 4 for loops in a API make the unit test.
TestRasterizationStadium20151010_tmp9DE7_x86
from 1 min 25 secs to
to 1 min 17 secs.
The following introduces a simple implementation of ParallelFor under D2007 without the generics and anoymous methods. It is very hard or nearly impossible to achieve the same fluid of programming style with the Parallel.For (from XE7 or OTL), but with this implementation, it is still easy and efficient to benefit from multi-cores with the similar Parallel.For approach.
First, we would need to define the parameters passing to the parallel function:
type TParallelParam = class public Data: Pointer; First: integer; Last: integer; constructor Create; overload; constructor Create(_Data: Pointer; _First, _Last: integer); overload; procedure SetValues(_First, _Last: integer); overload; procedure SetValues(_Data: Pointer; _First, _Last: integer); overload; destructor Destroy; override; end; constructor TParallelParam.Create; begin inherited Create; end; constructor TParallelParam.Create(_Data: Pointer; _First, _Last: integer); begin inherited Create; Self.SetValues(_Data, _First, _Last); end; procedure TParallelParam.SetValues(_First, _Last: Integer); begin Self.First := _First; Self.Last := _Last; end; procedure TParallelParam.SetValues(_Data: Pointer; _First, _Last: Integer); begin Self.Data := _Data; Self.First := _First; Self.Last := _Last; end; destructor TParallelParam.Destroy; begin inherited; end;
This TParallelParam has 3 fields that can be used by the parallel function to be executed by each thread. We define the Parallel Function prototype to be
type TParallelForFunc = procedure(Index: integer; const Param: TParallelParam);
The parameter index is different for each thread and the Param has all the data thread needs to access. The Data is a pointer type so you can store any parameters you like in a continuous memory locations that is accessed using the pointer.
We also need to wrap the Classes.TThread so allow passing these parameters:
type TWorkerThread = class(TThread) public _Method: TParallelForFunc; _Param: TParallelParam; _Index: integer; Done: boolean; constructor Create(CreateSuspended: Boolean); overload; destructor Destroy; override; protected procedure Execute; override; end; procedure TWorkerThread.Execute; begin try Self._Method(_Index, Self._Param); finally Self.Terminate; Self.Done := True; end; end; constructor TWorkerThread.Create(CreateSuspended: Boolean); begin inherited Create(CreateSuspended); Self.Done := False; end; destructor TWorkerThread.Destroy; begin inherited Destroy; end;
Thus, with all above support, we can then implement this ParallelFor:
procedure TParallelFor(Method: TParallelForFunc; const Param: TParallelParam; First, Last: integer; ThreadNumber: integer); var flags: array of boolean; threads: array of TWorkerThread; tnum, len, i, cnt, j: integer; begin len := Last - First + 1; if (len <= 0) then begin Raise Exception.Create('TParallelFor.Len = 0'); end; SetLength(flags, len); tnum := ThreadNumber; if (tnum < 2) then begin tnum := 2; end; SetLength(threads, tnum); i := 0; cnt := 0; // job count finished while (cnt < len) do // when not finished begin // find next available thread for j := 0 to tnum - 1 do begin // thread not assigned or finished if ((threads[j] = nil) or (threads[j].Done) or (threads[j].Terminated)) then begin // get next job while (flags[i]) do begin i := (i + 1) mod len; end; // mark the job being done flags[i] := True; // increment the counter Inc(cnt); // start the job if (threads[j] <> nil) then begin threads[j].Free; threads[j] := nil; end; if (threads[j] = nil) then begin threads[j] := TWorkerThread.Create(True); threads[j].FreeOnTerminate := False; end; with threads[j] do begin _Method := Method; _Param := Param; _Index := First + i; Priority := tpHigher; Resume; // run the tasks in parallel end; if (cnt >= len) then begin break; end; end; end; Sleep(0); // main thread busy spinning end; // wait for all threads for i := 0 to tnum - 1 do begin if (threads[i] <> nil) then begin WaitForSingleObject(threads[i].Handle, INFINITE); threads[i].Free; end; end; end;
Example usages
Put all above code in a separate unit, e.g. Parallel;
uses Parallel; implementation type _PAAB = ^_AAB; procedure Parallel1(Arg: integer; const obj: TParallelParam); var j, i, k: integer; map: _PAAB; begin map := obj.Data; i := obj.First; j := obj.Last; if (i <= j) then begin for k := i to j do begin if (map^[Arg][k] = 0) then begin map^[Arg][k] := 2; end else begin break; end; end; end else begin for k := i downto j do begin if (map^[Arg][k] = 0) then begin map^[Arg][k] := 2; end else begin break; end; end; end; end; procedure TestParallel; var map: _AAB; begin SetLength(map, 1000, 1000); obj := TParallelParam.Create; try obj.Data := @map; obj.SetValues(0, High(map)); TParallelFor(@Parallel1, obj, 0, High(map[0]), 8); // 8 threads finally obj.Free; end; end; end.
Limitations
I could have use AsyncCalls to simplified the code in the ParallelFor. However, since AsyncCall uses a Thread Pool to manage the threads. Threads are not terminated but kept in memory for needs until the unit is unloaded. It will have a thread deadlocks when the threads are freed in the finalization section of unit (com+ dll).
There is no Generics and Anonymous support at all under D2007 or before, so it would be impossible without compiler’s support. You may have to change the TParallelParam object to adapter your needs. For example, The step parameter could be added in the future.
Relevant Links
1. Delphi AsyncCalls Deadlock when Terminating the Threads (com+ dll finalization)
2. OTL can’t be compiled under D2007
3. Is it ok not to free objects before unit unloads from memory (finalization section)?
4. The Ultimate Delphi Threading Library
–EOF (The Ultimate Computing & Technology Blog) —
loading...
Last Post: Delphi TParallel Cleanup Needed
Next Post: Monitor the System Hardware Temperature (CPU) on Windows using SpeedFan [Freeware Download]