TParallel.For
выполняет потоковое выполнение событий итерации, но сам по себе является блокирующим методом. Так что для этого вам придется быть осторожным с синхронизацией, если вы запускаете это из основного потока.
Использование TThread.Queue
работает безопасно, но, как вы уже заметили, все события в очереди обрабатываются после завершения TParallel.For
- фактически после выхода из метода и возврата в состояние ожидания.
Использование TThread.Synchronize
вызовет взаимную блокировку, если вы используете его в событиях итерации и запускаете TParallel.For
из основного потока.
Вот небольшое приложение, показывающее разницу с использованием
CopyFiles
ParallelCopyFiles
AsyncCopyFiles
вызывает CopyFiles
из задачи
AsyncParallelCopyFiles
вызывает ParallelCopyFiles
из задачи
И я предполагаю, что AsyncParallelCopyFiles
это тот, кого вы ищете.
Внутри методов Async...
безопасно использовать TThread.Synchronize
- если вы не ждете задачи внутри основного потока.
unit Form.Main;
interface
uses
System.IOUtils,
System.Threading,
System.Types,
Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls, Vcl.ExtCtrls;
type
TLogMsg = record
private
FMsg: string;
FThreadID: Cardinal;
FOccurred: TDateTime;
public
class operator implicit( a: string ): TLogMsg;
class operator implicit( a: TLogMsg ): string;
constructor Create( const AMsg: string );
function ToString: string;
property Msg: string read FMsg;
property ThreadID: Cardinal read FThreadID;
property Occurred: TDateTime read FOccurred;
end;
type
TForm1 = class( TForm )
ListBox1: TListBox;
RadioGroup1: TRadioGroup;
Button1: TButton;
procedure Button1Click( Sender: TObject );
private
FTask: ITask;
procedure ThreadSafeLog( ALogMsg: TLogMsg );
public
procedure CopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
procedure ParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
function AsyncCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
function AsyncParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
end;
var
Form1: TForm1;
implementation
{$R *.dfm}
{ TForm1 }
// *** ATTENTION ***
// ParallelCopyFiles will cause a dead lock without USE_QUEUE
// but you still can try yourself ...
//
{$DEFINE USE_QUEUE}
//
// *****************
procedure TForm1.ThreadSafeLog( ALogMsg: TLogMsg );
begin
{$IFDEF USE_QUEUE}
TThread.Queue
{$ELSE}
TThread.Synchronize
{$ENDIF}
( nil,
procedure
begin
ListBox1.Items.Add( ALogMsg );
end );
end;
procedure TForm1.CopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
var
LSource, LDestination: string;
begin
ThreadSafeLog( 'CopyFiles - ENTER' );
for LSource in AFiles do
begin
LDestination := TPath.Combine( ADestPath, TPath.GetFileName( LSource ) );
ThreadSafeLog( 'Copy ' + LSource );
TFile.Copy( LSource, LDestination, Overwrite );
end;
ThreadSafeLog( 'CopyFiles - EXIT' );
end;
procedure TForm1.ParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
begin
ThreadSafeLog( 'ParallelCopyFiles - ENTER' );
TParallel.&For( Low( AFiles ), High( AFiles ),
procedure( AIndex: Integer )
var
LSource, LDestination: string;
begin
LSource := AFiles[AIndex];
LDestination := TPath.Combine( ADestPath, TPath.GetFileName( LSource ) );
ThreadSafeLog( 'Copy ' + LSource );
TFile.Copy( LSource, LDestination, Overwrite );
end );
ThreadSafeLog( 'ParallelCopyFiles - EXIT' );
end;
function TForm1.AsyncCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
begin
ThreadSafeLog( 'AsyncCopyFiles - ENTER' );
Result := TTask.Run(
procedure
begin
CopyFiles( AFiles, ADestPath, Overwrite );
TThread.Synchronize( nil, ACallback );
end );
ThreadSafeLog( 'AsyncCopyFiles - EXIT' );
end;
function TForm1.AsyncParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
begin
ThreadSafeLog( 'AsyncParallelCopyFiles - ENTER' );
Result := TTask.Run(
procedure
begin
ParallelCopyFiles( AFiles, ADestPath, Overwrite );
TThread.Synchronize( nil, ACallback );
end );
ThreadSafeLog( 'AsyncParallelCopyFiles - EXIT' );
end;
procedure TForm1.Button1Click( Sender: TObject );
var
LFiles: TStringDynArray;
LDestPath: string;
begin
ListBox1.Clear; // Clear the log destination
LFiles := TDirectory.GetFiles( TPath.GetDocumentsPath, '*.*' );
LDestPath := TPath.Combine( TPath.GetDocumentsPath, '_COPYTEST_' );
TDirectory.CreateDirectory( LDestPath );
case RadioGroup1.ItemIndex of
0:
CopyFiles( LFiles, LDestPath, True );
1:
ParallelCopyFiles( LFiles, LDestPath, True );
2:
begin
Button1.Enabled := False;
AsyncCopyFiles( LFiles, LDestPath, True,
procedure
begin
Button1.Enabled := True;
end );
end;
3:
begin
Button1.Enabled := False;
AsyncParallelCopyFiles( LFiles, LDestPath, True,
procedure
begin
Button1.Enabled := True;
end );
end;
end;
end;
{ TLogMsg }
constructor TLogMsg.Create( const AMsg: string );
begin
FMsg := AMsg;
FThreadID := TThread.CurrentThread.ThreadID;
FOccurred := Now;
end;
class operator TLogMsg.implicit( a: string ): TLogMsg;
begin
Result := TLogMsg.Create( a );
end;
class operator TLogMsg.implicit( a: TLogMsg ): string;
begin
Result := a.ToString;
end;
function TLogMsg.ToString: string;
begin
Result := Format( '$%8.8x [%s] %s', [FThreadID, FormatDateTime( 'hh:nn:ss.zzz', FOccurred ), FMsg] );
end;
end.
ОБНОВЛЕНО
Я просто расширяю сообщение журнала дополнительной информацией о потоке и времени появления сообщения.
person
Sir Rufo
schedule
10.03.2015
TThread.Queue
выполняется ПОСЛЕ завершения потока.TThread.Synchronize
это то, что мне нужно для моей реализации. - person Aid Vllasaliu   schedule 10.03.2015TThread.Queue(nil, Aproc);
должно работать. - person LU RD   schedule 10.03.2015TThread.Queue
сnil
, назначенным первому параметру, работает, но, почитав об этом подробнее, оказывается, что любой код внутриTThread.Queue
выполняется после завершения потока. Мне это не нужно, мне нужно регистрировать каждый шаг процесса по мере его выполнения, поэтомуTThread.Synchronize
больше подходит. - person Aid Vllasaliu   schedule 10.03.2015TThread.Queue()
, выполняется всякий раз, когда основной поток выполняет его. Это не имеет ничего общего со временем жизни потока, который вызываетQueue()
. Поток может быть все еще запущен или завершен. Дело в том, чтоQueue()
является асинхронным, он немедленно завершает работу и не ждет выполнения кода.TThread.Synchronize()
ждет. - person Remy Lebeau   schedule 10.03.2015TParallel.For
только потому, что это блокирующая операция и ждет, пока все не будет выполнено.TThread.Sychronize
должно вызвать тупик здесь - person Sir Rufo   schedule 10.03.2015log()
просто добавляет строку строки кTMemo
- person Aid Vllasaliu   schedule 10.03.2015Form1.Memo1.....
своим методомLog()
. - person Aid Vllasaliu   schedule 11.03.2015