Как синхронизировать TParallell в Delphi XE7 для регистрации данных

Мне нужно регистрировать некоторые данные и предпочтительно копировать файлы с помощью потока, но с использованием кода ниже, но это просто зависает в моем приложении.

если я правильно понимаю всю библиотеку XE7 Parallell, TThread.Queue и TThread.Synchronize должны синхронизироваться с основным потоком, но в моем случае все приложение зависает.

Что я делаю не так?

procedure TCopyDeviceContent.StartCopy;
var
  OK: boolean;
begin
  OK := false;

  //  showmessage('fFiles.Count = '+inttostr(fFiles.Count));

  if fFiles.Count = 0 then
  begin
    NotifyDone;
    exit;
  end;

  TParallel.For(0, fFiles.Count-1,
    procedure (Value: Integer)
    begin
      TThread.Queue(TThread.CurrentThread, //Here is where it freezes
        procedure
        begin
          Log('Setting fCurrentFile to '+fFiles.Strings[value]);
        end
      );

      sleep(1000);

      fCurrentFile := fFiles.Strings[value];
      Log('Triggering fOnBeforeProcess');
      if assigned(fOnBeforeProcess) then fOnBeforeProcess(self);

      Log('Does file exist?');
      if FileExists(fFiles.Strings[value]) = true then
      begin
        Log('Yes!');
        Log('Trying to copy file to Windows temp folder.');
        try
          TFile.Copy(fFiles.Strings[value], GetEnvironmentVariable('TEMP'));
        finally
          OK := true;
        end;

        if OK = true then
        begin
          Log('Success!');
          OK := false;

          Log('Does file exist in Windows temp folder?');
          if FileExists(GetEnvironmentVariable('TEMP')+ExtractFileName(fFiles.Strings[value])) then
          begin
            Log('Yes');
            Log('Trying to copy file from Windows temp folder to final destination: '+DestPath+DateToStr(Now)+'\'+ExtractFileName(fFiles.Strings[value]));
            try
              TFile.Move(GetEnvironmentVariable('TEMP')+ExtractFileName(fFiles.Strings[value]),
              DestPath+DateToStr(Now)+'\'+ExtractFileName(fFiles.Strings[value]));
            finally
              fFilesOK.Add(fFiles.Strings[value]);
              Log('Sucess!');
            end;
          end;    
        end
        else
        begin
          fFilesFailed.Add(fFiles.Strings[value]);
          Log('Failed copying to Windows temp folder!');
        end;
      end;
      inc(fProgress);
      NotifyProgress;
      Log('File copy success. Moving on to next file if available...');
    end
  );

  NotifyDone;

  if fFilesFailed.Count > 0 then NotifyError;
end;

person Aid Vllasaliu    schedule 10.03.2015    source источник
comment
Не передавать поток при вызове TThread.Queue. Используйте перегруженный вызов без этой опции. См. раздел Использование параллельной библиотеки Delphi XE7.   -  person LU RD    schedule 10.03.2015
comment
Исправление: код внутри TThread.Queue выполняется ПОСЛЕ завершения потока. TThread.Synchronizeэто то, что мне нужно для моей реализации.   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@LURD нет перегруженного вызова без опции. Где я могу найти это?   -  person Aid Vllasaliu    schedule 10.03.2015
comment
TThread.Queue(nil, Aproc); должно работать.   -  person LU RD    schedule 10.03.2015
comment
TThread.Queue с nil, назначенным первому параметру, работает, но, почитав об этом подробнее, оказывается, что любой код внутри TThread.Queue выполняется после завершения потока. Мне это не нужно, мне нужно регистрировать каждый шаг процесса по мере его выполнения, поэтому TThread.Synchronize больше подходит.   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@LURD, кстати, что такое Aproc?   -  person Aid Vllasaliu    schedule 10.03.2015
comment
Это сокращение от анонимной процедуры.   -  person LU RD    schedule 10.03.2015
comment
код внутри TThread.Queue выполняется ПОСЛЕ завершения потока. Не обязательно верно. Код будет выполнен в конце концов, в документах никогда не говорилось то, что я сказал.   -  person EProgrammerNotFound    schedule 10.03.2015
comment
@EProgrammerNotFound, вы правы!   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@AidVllasaliu сказать, что код в TThread.Queue выполняется после завершения потока, совершенно неверно.   -  person iamjoosy    schedule 10.03.2015
comment
Код, переданный в TThread.Queue(), выполняется всякий раз, когда основной поток выполняет его. Это не имеет ничего общего со временем жизни потока, который вызывает Queue(). Поток может быть все еще запущен или завершен. Дело в том, что Queue() является асинхронным, он немедленно завершает работу и не ждет выполнения кода. TThread.Synchronize() ждет.   -  person Remy Lebeau    schedule 10.03.2015
comment
Что делает процедура Log()? Обновляет пользовательский интерфейс, записывает в файл?   -  person iPath ツ    schedule 10.03.2015
comment
В этом особом случае событие в очереди выполняется после TParallel.For только потому, что это блокирующая операция и ждет, пока все не будет выполнено. TThread.Sychronize должно вызвать тупик здесь   -  person Sir Rufo    schedule 10.03.2015
comment
@iPathツ log() просто добавляет строку строки к TMemo   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@AidVllasaliu использует ли метод Log() TThread.Synchronize/queue?   -  person iPath ツ    schedule 10.03.2015
comment
@iPathツ Посмотрите на мой ответ, который объясняет, почему сообщения в очереди обрабатываются потом   -  person Sir Rufo    schedule 10.03.2015
comment
@iPathツ нет, метод Log() находится в основном потоке. на самом деле совсем в другом подразделении.   -  person Aid Vllasaliu    schedule 11.03.2015
comment
@AidVllasaliu, такой метод, как ваш метод Log(), не может находиться в основном потоке. Его можно вызывать из любого потока, включая основной поток. То есть он выполняется в контексте потока. В вашем примере вы в основном вызываете метод Log() из контекста текущего рабочего потока, а не из основного потока. Единственный способ вызвать его в контексте основного потока — использовать методы Synchronize/Queue. Единственная разница между этими двумя методами заключается в том, что Synchronize — это блокирующая функция, а Queue — не блокирующая.   -  person iPath ツ    schedule 11.03.2015
comment
@iPathツ Ну, я имею в виду свой код, который сейчас работает, благодаря Эдину и LURD. Их ответ работает правильно, и я заменил их строку Form1.Memo1..... своим методом Log().   -  person Aid Vllasaliu    schedule 11.03.2015


Ответы (2)


Если цель состоит в том, чтобы просто копировать файлы без зависания потока пользовательского интерфейса, я бы просто использовал что-то вроде этого:

procedure TCopyDeviceContent.StartCopy;
var
 aTask: ITask;
begin
 aTask := TTask.Create (procedure ()
   begin
      // Copy files here  
      TThread.Synchronize(nil,procedure
                  begin
                     //Interact with UI  
                     Form1.Memo1.Lines.Add(‘Begin Execution’);
                  end);
   end);
 aTask.Start;
end;

Внутри процедуры задачи просто скопируйте файлы, как обычно, я не уверен, поможет ли вам копирование с использованием нескольких потоков.

Если вам нужно взаимодействовать с пользовательским интерфейсом, вам нужно вернуться к потоку пользовательского интерфейса, вы можете использовать TThread.Synchronize.

person Edin Omeragic    schedule 10.03.2015

TParallel.For выполняет потоковое выполнение событий итерации, но сам по себе является блокирующим методом. Так что для этого вам придется быть осторожным с синхронизацией, если вы запускаете это из основного потока.

Использование TThread.Queue работает безопасно, но, как вы уже заметили, все события в очереди обрабатываются после завершения TParallel.For - фактически после выхода из метода и возврата в состояние ожидания.

Использование TThread.Synchronize вызовет взаимную блокировку, если вы используете его в событиях итерации и запускаете TParallel.For из основного потока.

Вот небольшое приложение, показывающее разницу с использованием

  • CopyFiles
  • ParallelCopyFiles
  • AsyncCopyFiles вызывает CopyFiles из задачи
  • AsyncParallelCopyFiles вызывает ParallelCopyFiles из задачи

И я предполагаю, что AsyncParallelCopyFiles это тот, кого вы ищете.

Внутри методов Async... безопасно использовать TThread.Synchronize - если вы не ждете задачи внутри основного потока.

unit Form.Main;

interface

uses
  System.IOUtils,
  System.Threading,
  System.Types,

  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TLogMsg = record
  private
    FMsg: string;
    FThreadID: Cardinal;
    FOccurred: TDateTime;
  public
    class operator implicit( a: string ): TLogMsg;
    class operator implicit( a: TLogMsg ): string;

    constructor Create( const AMsg: string );
    function ToString: string;

    property Msg: string read FMsg;
    property ThreadID: Cardinal read FThreadID;
    property Occurred: TDateTime read FOccurred;
  end;

type
  TForm1 = class( TForm )
    ListBox1: TListBox;
    RadioGroup1: TRadioGroup;
    Button1: TButton;
    procedure Button1Click( Sender: TObject );
  private
    FTask: ITask;
    procedure ThreadSafeLog( ALogMsg: TLogMsg );
  public
    procedure CopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
    procedure ParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );

    function AsyncCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
    function AsyncParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}
{ TForm1 }

// *** ATTENTION ***
// ParallelCopyFiles will cause a dead lock without USE_QUEUE
// but you still can try yourself ...
//
{$DEFINE USE_QUEUE}
//
// *****************

procedure TForm1.ThreadSafeLog( ALogMsg: TLogMsg );
begin
{$IFDEF USE_QUEUE}
  TThread.Queue
{$ELSE}
  TThread.Synchronize
{$ENDIF}
    ( nil,
      procedure
    begin
      ListBox1.Items.Add( ALogMsg );
    end );
end;

procedure TForm1.CopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
var
  LSource, LDestination: string;
begin
  ThreadSafeLog( 'CopyFiles - ENTER' );
  for LSource in AFiles do
    begin
      LDestination := TPath.Combine( ADestPath, TPath.GetFileName( LSource ) );
      ThreadSafeLog( 'Copy ' + LSource );
      TFile.Copy( LSource, LDestination, Overwrite );
    end;
  ThreadSafeLog( 'CopyFiles - EXIT' );
end;

procedure TForm1.ParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
begin
  ThreadSafeLog( 'ParallelCopyFiles - ENTER' );
  TParallel.&For( Low( AFiles ), High( AFiles ),
    procedure( AIndex: Integer )
    var
      LSource, LDestination: string;
    begin
      LSource := AFiles[AIndex];
      LDestination := TPath.Combine( ADestPath, TPath.GetFileName( LSource ) );
      ThreadSafeLog( 'Copy ' + LSource );
      TFile.Copy( LSource, LDestination, Overwrite );
    end );
  ThreadSafeLog( 'ParallelCopyFiles - EXIT' );
end;

function TForm1.AsyncCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
begin
  ThreadSafeLog( 'AsyncCopyFiles - ENTER' );
  Result := TTask.Run(
    procedure
    begin
      CopyFiles( AFiles, ADestPath, Overwrite );
      TThread.Synchronize( nil, ACallback );
    end );
  ThreadSafeLog( 'AsyncCopyFiles - EXIT' );
end;

function TForm1.AsyncParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
begin
  ThreadSafeLog( 'AsyncParallelCopyFiles - ENTER' );
  Result := TTask.Run(
    procedure
    begin
      ParallelCopyFiles( AFiles, ADestPath, Overwrite );
      TThread.Synchronize( nil, ACallback );
    end );
  ThreadSafeLog( 'AsyncParallelCopyFiles - EXIT' );
end;

procedure TForm1.Button1Click( Sender: TObject );
var
  LFiles: TStringDynArray;
  LDestPath: string;
begin
  ListBox1.Clear; // Clear the log destination

  LFiles := TDirectory.GetFiles( TPath.GetDocumentsPath, '*.*' );
  LDestPath := TPath.Combine( TPath.GetDocumentsPath, '_COPYTEST_' );
  TDirectory.CreateDirectory( LDestPath );

  case RadioGroup1.ItemIndex of
    0:
      CopyFiles( LFiles, LDestPath, True );
    1:
      ParallelCopyFiles( LFiles, LDestPath, True );
    2:
      begin
        Button1.Enabled := False;
        AsyncCopyFiles( LFiles, LDestPath, True,
          procedure
          begin
            Button1.Enabled := True;
          end );
      end;
    3:
      begin
        Button1.Enabled := False;
        AsyncParallelCopyFiles( LFiles, LDestPath, True,
          procedure
          begin
            Button1.Enabled := True;
          end );
      end;
  end;
end;

{ TLogMsg }

constructor TLogMsg.Create( const AMsg: string );
begin
  FMsg := AMsg;
  FThreadID := TThread.CurrentThread.ThreadID;
  FOccurred := Now;
end;

class operator TLogMsg.implicit( a: string ): TLogMsg;
begin
  Result := TLogMsg.Create( a );
end;

class operator TLogMsg.implicit( a: TLogMsg ): string;
begin
  Result := a.ToString;
end;

function TLogMsg.ToString: string;
begin
  Result := Format( '$%8.8x [%s] %s', [FThreadID, FormatDateTime( 'hh:nn:ss.zzz', FOccurred ), FMsg] );
end;

end.

ОБНОВЛЕНО

Я просто расширяю сообщение журнала дополнительной информацией о потоке и времени появления сообщения.

person Sir Rufo    schedule 10.03.2015
comment
Вы уверены в тупиковой ситуации, если Parallel.For запускается из основного потока с вызовами для синхронизации? Я не использовал библиотеку потоков, но моя собственная реализация помещает накладные расходы Parallel.For в собственный поток. - person LU RD; 10.03.2015
comment
@LURD Это причина переключения компилятора. Вы получите тупик, и я был бы очень удивлен, если бы не. Как вы можете синхронизировать поток с основным потоком, если основной поток ожидает завершения этого потока? Вы не можете - person Sir Rufo; 11.03.2015
comment
@LURD TParallel.ForWorker в PPL создает отдельную задачу для реальной работы, но заканчивается RootTask.Start.Wait(). Итак, TParallel.For — это блокирующая функция. - person iPath ツ; 11.03.2015
comment
@SirRufo +100 за объяснение и примеры. Я думаю, что ваш пост должен быть выбран в качестве ответа :) - person iPath ツ; 11.03.2015
comment
@iPathツ, да, я просмотрел это и подтвердил тупиковую ситуацию. Я ожидал ожидания вызова CheckSynchronize или чего-то подобного, чтобы избежать взаимоблокировки, но этого не произошло. - person LU RD; 11.03.2015
comment
Спасибо за это полностью прокомментированное объяснение того, почему у меня возникла тупиковая ситуация в моем TParallel.For спящий цикл ^^ - person Darkendorf; 24.12.2015
comment
@SirRufo, вы определили ThreadSafeLog как procedure TForm1.ThreadSafeLog( ALogMsg: TLogMsg );. Затем позже вы называете это как ThreadSafeLog( 'ParallelCopyFiles - EXIT' );. Он компилируется? - person Interface Unknown; 03.07.2017
comment
@InterfaceUnknown Да, компилируется - person Sir Rufo; 03.07.2017
comment
@SirRufo, спасибо. Я, наверное, что-то упустил... Изучу код глубже. - person Interface Unknown; 03.07.2017
comment
@InterfaceUnknown Волшебство произойдет из-за неявного оператора TLogMsg. - person Sir Rufo; 03.07.2017