วิธีซิงโครไนซ์ TParallell ใน Delphi XE7 เพื่อบันทึกข้อมูล

ฉันจำเป็นต้องบันทึกข้อมูลบางส่วนและควรคัดลอกไฟล์โดยใช้เธรด แต่โดยการใช้โค้ดด้านล่าง แต่มันกลับทำให้แอปของฉันค้าง

ถ้าฉันเข้าใจไลบรารี XE7 Parallell ทั้งหมดอย่างถูกต้อง TThread.Queue และ TThread.Synchronize ควรจะซิงค์กับเธรดหลัก แต่ในกรณีของฉัน แอปทั้งหมดค้าง

ผมทำอะไรผิดหรือเปล่า?

procedure TCopyDeviceContent.StartCopy;
var
  OK: boolean;
begin
  OK := false;

  //  showmessage('fFiles.Count = '+inttostr(fFiles.Count));

  if fFiles.Count = 0 then
  begin
    NotifyDone;
    exit;
  end;

  TParallel.For(0, fFiles.Count-1,
    procedure (Value: Integer)
    begin
      TThread.Queue(TThread.CurrentThread, //Here is where it freezes
        procedure
        begin
          Log('Setting fCurrentFile to '+fFiles.Strings[value]);
        end
      );

      sleep(1000);

      fCurrentFile := fFiles.Strings[value];
      Log('Triggering fOnBeforeProcess');
      if assigned(fOnBeforeProcess) then fOnBeforeProcess(self);

      Log('Does file exist?');
      if FileExists(fFiles.Strings[value]) = true then
      begin
        Log('Yes!');
        Log('Trying to copy file to Windows temp folder.');
        try
          TFile.Copy(fFiles.Strings[value], GetEnvironmentVariable('TEMP'));
        finally
          OK := true;
        end;

        if OK = true then
        begin
          Log('Success!');
          OK := false;

          Log('Does file exist in Windows temp folder?');
          if FileExists(GetEnvironmentVariable('TEMP')+ExtractFileName(fFiles.Strings[value])) then
          begin
            Log('Yes');
            Log('Trying to copy file from Windows temp folder to final destination: '+DestPath+DateToStr(Now)+'\'+ExtractFileName(fFiles.Strings[value]));
            try
              TFile.Move(GetEnvironmentVariable('TEMP')+ExtractFileName(fFiles.Strings[value]),
              DestPath+DateToStr(Now)+'\'+ExtractFileName(fFiles.Strings[value]));
            finally
              fFilesOK.Add(fFiles.Strings[value]);
              Log('Sucess!');
            end;
          end;    
        end
        else
        begin
          fFilesFailed.Add(fFiles.Strings[value]);
          Log('Failed copying to Windows temp folder!');
        end;
      end;
      inc(fProgress);
      NotifyProgress;
      Log('File copy success. Moving on to next file if available...');
    end
  );

  NotifyDone;

  if fFilesFailed.Count > 0 then NotifyError;
end;

person Aid Vllasaliu    schedule 10.03.2015    source แหล่งที่มา
comment
อย่าส่งผ่านเธรดเมื่อเรียก TThread.Queue ใช้สายที่โอเวอร์โหลดโดยไม่มีตัวเลือกนี้ ดูการใช้ Delphi XE7 Parallel Library   -  person LU RD    schedule 10.03.2015
comment
การแก้ไข: โค้ดภายใน TThread.Queue ถูกดำเนินการหลังจากเธรดเสร็จสิ้น TThread.Synchronizeคือสิ่งที่ฉันต้องการสำหรับการนำไปใช้งาน   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@LURD ไม่มีการโทรมากเกินไปโดยไม่มีตัวเลือก ฉันจะหามันได้ที่ไหน?   -  person Aid Vllasaliu    schedule 10.03.2015
comment
TThread.Queue(nil, Aproc); ควรใช้งานได้   -  person LU RD    schedule 10.03.2015
comment
TThread.Queue โดย nil ที่กำหนดให้กับพารามิเตอร์แรกใช้งานได้ แต่หลังจากอ่านเพิ่มเติมเกี่ยวกับสิ่งนี้ ปรากฎว่าโค้ดใดๆ ภายใน TThread.Queue จะถูกดำเนินการหลังจากเธรดเสร็จสิ้น ฉันไม่ต้องการสิ่งนั้น ฉันต้องบันทึกทุกขั้นตอนของกระบวนการในขณะที่มันเกิดขึ้น ดังนั้นเหตุใด TThread.Synchronize จึงเหมาะสมกว่า   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@LURD แล้ว Aproc คืออะไร?   -  person Aid Vllasaliu    schedule 10.03.2015
comment
ย่อมาจากขั้นตอนที่ไม่เปิดเผยตัวตน   -  person LU RD    schedule 10.03.2015
comment
โค้ดภายใน TThread.Queue จะถูกดำเนินการหลังจากที่เธรดเสร็จสิ้นแล้ว ไม่จำเป็นต้องเป็นจริงเสมอไป โค้ดจะดำเนินการในที่สุด เอกสารไม่เคยระบุสิ่งที่ฉันพูด   -  person EProgrammerNotFound    schedule 10.03.2015
comment
@EProgrammerNotFound คุณถูกต้อง!   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@AidVllasaliu เพื่อบอกว่าโค้ดใน TThread.Queue ดำเนินการหลังจากเธรดเสร็จสิ้นนั้นผิดธรรมดา   -  person iamjoosy    schedule 10.03.2015
comment
รหัสที่ส่งผ่านไปยัง TThread.Queue() จะถูกดำเนินการเมื่อใดก็ตามที่เธรดหลักเข้ามาดำเนินการ มันไม่เกี่ยวอะไรกับอายุการใช้งานของเธรดที่เรียก Queue() เธรดอาจยังคงทำงานอยู่ หรืออาจยุติลงแล้ว ประเด็นคือ Queue() อะซิงโครนัส มันจะออกทันทีและไม่รอให้โค้ดทำงาน TThread.Synchronize() จะรอ   -  person Remy Lebeau    schedule 10.03.2015
comment
ขั้นตอน Log() ทำหน้าที่อะไร? อัพเดต UI เขียนลงไฟล์?   -  person iPath ツ    schedule 10.03.2015
comment
ในกรณีพิเศษนี้ เหตุการณ์ที่อยู่ในคิวจะถูกดำเนินการหลังจาก TParallel.For เพียงเพราะเป็นการดำเนินการบล็อก และรอจนกว่าทุกอย่างจะเสร็จสิ้น TThread.Sychronize ควรทำให้เกิดการหยุดชะงักที่นี่   -  person Sir Rufo    schedule 10.03.2015
comment
@iPathツ log() เพียงเพิ่มบรรทัดของสตริงใน TMemo   -  person Aid Vllasaliu    schedule 10.03.2015
comment
@AidVllasaliu วิธีการ Log() ใช้ TThread.Synchronize/queue หรือไม่   -  person iPath ツ    schedule 10.03.2015
comment
@iPathツ ดูคำตอบของฉัน ซึ่งอธิบายว่าทำไมข้อความที่อยู่ในคิวจึงได้รับการประมวลผล หลังจากนั้น   -  person Sir Rufo    schedule 10.03.2015
comment
@iPathツ ไม่ Log() วิธีการอยู่ในเธรดหลัก ในหน่วยที่แตกต่างไปจากเดิมอย่างสิ้นเชิง   -  person Aid Vllasaliu    schedule 11.03.2015
comment
@AidVllasaliu วิธีการเช่นวิธี Log() ของคุณไม่สามารถอยู่ในเธรดหลักได้ สามารถเรียกจากเธรดใดก็ได้รวมถึงเธรดหลักด้วย นั่นคือมันถูกดำเนินการในบริบทของเธรด ในตัวอย่างของคุณ คุณเรียกเมธอด Log() เป็นหลักจากบริบทของเธรดสำหรับผู้ปฏิบัติงานปัจจุบัน ไม่ใช่จากเธรดหลัก วิธีเดียวที่จะเรียกมันในบริบทของเธรดหลักคือการใช้วิธีการซิงโครไนซ์/คิว ข้อแตกต่างระหว่างสองวิธีคือ Synchronize เป็นฟังก์ชันการบล็อก และ Queue ไม่ใช่ฟังก์ชันการบล็อก   -  person iPath ツ    schedule 11.03.2015
comment
@iPathツ ฉันหมายถึงรหัสของฉันซึ่งใช้งานได้ตอนนี้ขอบคุณ Edin และ LURD คำตอบของพวกเขาทำงานได้อย่างถูกต้อง และฉันได้แทนที่บรรทัด Form1.Memo1..... ของพวกเขาด้วยวิธี Log() ของฉันแล้ว   -  person Aid Vllasaliu    schedule 11.03.2015


คำตอบ (2)


หากเป้าหมายเป็นเพียงการคัดลอกไฟล์โดยไม่ค้างเธรด UI ฉันจะใช้สิ่งนี้:

procedure TCopyDeviceContent.StartCopy;
var
 aTask: ITask;
begin
 aTask := TTask.Create (procedure ()
   begin
      // Copy files here  
      TThread.Synchronize(nil,procedure
                  begin
                     //Interact with UI  
                     Form1.Memo1.Lines.Add(‘Begin Execution’);
                  end);
   end);
 aTask.Start;
end;

ภายในขั้นตอนงาน เพียงแค่คัดลอกไฟล์ตามปกติ ฉันไม่แน่ใจว่าการคัดลอกโดยใช้หลายเธรดจะช่วยคุณได้หรือไม่

ในกรณีที่คุณต้องการโต้ตอบกับ UI คุณต้องเปลี่ยนกลับไปเป็นเธรด UI คุณสามารถใช้ TThread.Synchronize

person Edin Omeragic    schedule 10.03.2015

TParallel.For ดำเนินการประมวลผลเหตุการณ์การวนซ้ำแบบเธรด แต่ตัวมันเองเป็นวิธีการบล็อก ดังนั้นคุณจะต้องระมัดระวังในการซิงโครไนซ์หากคุณเริ่มจากเธรดหลัก

การใช้ TThread.Queue ทำงานได้อย่างปลอดภัย แต่อย่างที่คุณสังเกตเห็นแล้วว่า กิจกรรมที่อยู่ในคิวทั้งหมดจะได้รับการประมวลผล หลังจาก TParallel.For เสร็จสิ้น - อันที่จริง หลังจากออกจากเมธอดและกลับสู่สถานะไม่ได้ใช้งาน

การใช้ TThread.Synchronize จะทำให้เกิด การล็อกตาย หากคุณใช้ในเหตุการณ์การวนซ้ำและเริ่มต้น TParallel.For จากเธรดหลัก

นี่เป็นแอปพลิเคชั่นเล็กๆ น้อยๆ ที่แสดงความแตกต่างในการใช้งาน

  • CopyFiles
  • ParallelCopyFiles
  • AsyncCopyFiles โทร CopyFiles จากงาน
  • AsyncParallelCopyFiles โทร ParallelCopyFiles จากงาน

และฉันคิดว่า AsyncParallelCopyFiles คือสิ่งที่คุณกำลังมองหา

ภายในวิธีการ Async... คุณสามารถใช้ TThread.Synchronize - ถ้าคุณไม่รองานภายในเธรดหลัก

unit Form.Main;

interface

uses
  System.IOUtils,
  System.Threading,
  System.Types,

  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TLogMsg = record
  private
    FMsg: string;
    FThreadID: Cardinal;
    FOccurred: TDateTime;
  public
    class operator implicit( a: string ): TLogMsg;
    class operator implicit( a: TLogMsg ): string;

    constructor Create( const AMsg: string );
    function ToString: string;

    property Msg: string read FMsg;
    property ThreadID: Cardinal read FThreadID;
    property Occurred: TDateTime read FOccurred;
  end;

type
  TForm1 = class( TForm )
    ListBox1: TListBox;
    RadioGroup1: TRadioGroup;
    Button1: TButton;
    procedure Button1Click( Sender: TObject );
  private
    FTask: ITask;
    procedure ThreadSafeLog( ALogMsg: TLogMsg );
  public
    procedure CopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
    procedure ParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );

    function AsyncCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
    function AsyncParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
  end;

var
  Form1: TForm1;

implementation

{$R *.dfm}
{ TForm1 }

// *** ATTENTION ***
// ParallelCopyFiles will cause a dead lock without USE_QUEUE
// but you still can try yourself ...
//
{$DEFINE USE_QUEUE}
//
// *****************

procedure TForm1.ThreadSafeLog( ALogMsg: TLogMsg );
begin
{$IFDEF USE_QUEUE}
  TThread.Queue
{$ELSE}
  TThread.Synchronize
{$ENDIF}
    ( nil,
      procedure
    begin
      ListBox1.Items.Add( ALogMsg );
    end );
end;

procedure TForm1.CopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
var
  LSource, LDestination: string;
begin
  ThreadSafeLog( 'CopyFiles - ENTER' );
  for LSource in AFiles do
    begin
      LDestination := TPath.Combine( ADestPath, TPath.GetFileName( LSource ) );
      ThreadSafeLog( 'Copy ' + LSource );
      TFile.Copy( LSource, LDestination, Overwrite );
    end;
  ThreadSafeLog( 'CopyFiles - EXIT' );
end;

procedure TForm1.ParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean );
begin
  ThreadSafeLog( 'ParallelCopyFiles - ENTER' );
  TParallel.&For( Low( AFiles ), High( AFiles ),
    procedure( AIndex: Integer )
    var
      LSource, LDestination: string;
    begin
      LSource := AFiles[AIndex];
      LDestination := TPath.Combine( ADestPath, TPath.GetFileName( LSource ) );
      ThreadSafeLog( 'Copy ' + LSource );
      TFile.Copy( LSource, LDestination, Overwrite );
    end );
  ThreadSafeLog( 'ParallelCopyFiles - EXIT' );
end;

function TForm1.AsyncCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
begin
  ThreadSafeLog( 'AsyncCopyFiles - ENTER' );
  Result := TTask.Run(
    procedure
    begin
      CopyFiles( AFiles, ADestPath, Overwrite );
      TThread.Synchronize( nil, ACallback );
    end );
  ThreadSafeLog( 'AsyncCopyFiles - EXIT' );
end;

function TForm1.AsyncParallelCopyFiles( AFiles: TStringDynArray; ADestPath: string; Overwrite: Boolean; ACallback: TThreadProcedure ): ITask;
begin
  ThreadSafeLog( 'AsyncParallelCopyFiles - ENTER' );
  Result := TTask.Run(
    procedure
    begin
      ParallelCopyFiles( AFiles, ADestPath, Overwrite );
      TThread.Synchronize( nil, ACallback );
    end );
  ThreadSafeLog( 'AsyncParallelCopyFiles - EXIT' );
end;

procedure TForm1.Button1Click( Sender: TObject );
var
  LFiles: TStringDynArray;
  LDestPath: string;
begin
  ListBox1.Clear; // Clear the log destination

  LFiles := TDirectory.GetFiles( TPath.GetDocumentsPath, '*.*' );
  LDestPath := TPath.Combine( TPath.GetDocumentsPath, '_COPYTEST_' );
  TDirectory.CreateDirectory( LDestPath );

  case RadioGroup1.ItemIndex of
    0:
      CopyFiles( LFiles, LDestPath, True );
    1:
      ParallelCopyFiles( LFiles, LDestPath, True );
    2:
      begin
        Button1.Enabled := False;
        AsyncCopyFiles( LFiles, LDestPath, True,
          procedure
          begin
            Button1.Enabled := True;
          end );
      end;
    3:
      begin
        Button1.Enabled := False;
        AsyncParallelCopyFiles( LFiles, LDestPath, True,
          procedure
          begin
            Button1.Enabled := True;
          end );
      end;
  end;
end;

{ TLogMsg }

constructor TLogMsg.Create( const AMsg: string );
begin
  FMsg := AMsg;
  FThreadID := TThread.CurrentThread.ThreadID;
  FOccurred := Now;
end;

class operator TLogMsg.implicit( a: string ): TLogMsg;
begin
  Result := TLogMsg.Create( a );
end;

class operator TLogMsg.implicit( a: TLogMsg ): string;
begin
  Result := a.ToString;
end;

function TLogMsg.ToString: string;
begin
  Result := Format( '$%8.8x [%s] %s', [FThreadID, FormatDateTime( 'hh:nn:ss.zzz', FOccurred ), FMsg] );
end;

end.

อัปเดตแล้ว

ฉันเพิ่งขยายข้อความบันทึกด้วยข้อมูลเพิ่มเติมเกี่ยวกับเธรดและเวลาที่เกิดขึ้นของข้อความ

person Sir Rufo    schedule 10.03.2015
comment
คุณแน่ใจเกี่ยวกับการหยุดชะงักหรือไม่ หาก Parallel.For เริ่มต้นจากเธรดหลักพร้อมการเรียกให้ซิงโครไนซ์ ฉันไม่ได้ใช้ไลบรารีเธรด แต่การใช้งานของฉันเองทำให้โอเวอร์เฮดของ Parallel.For อยู่ในเธรดของตัวเอง - person LU RD; 10.03.2015
comment
@LURD นั่นคือเหตุผลของสวิตช์คอมไพเลอร์ คุณจะเกิดการหยุดชะงักและฉันจะแปลกใจมากถ้าไม่เป็นเช่นนั้น คุณจะซิงโครไนซ์เธรดกับเธรดหลักได้อย่างไร หากเธรดหลักกำลังรอให้เธรดนี้เสร็จสิ้น คุณไม่สามารถ - person Sir Rufo; 11.03.2015
comment
@LURD TParallel.ForWorker ใน PPL สร้างงานแยกต่างหากสำหรับ wotrk จริง แต่ลงท้ายด้วย RootTask.Start.Wait() ดังนั้น TParallel.For จึงเป็นฟังก์ชันการบล็อก - person iPath ツ; 11.03.2015
comment
@SirRufo +100 สำหรับคำอธิบายและตัวอย่าง ฉันคิดว่าโพสต์ของคุณควรเลือกเป็นคำตอบ :) - person iPath ツ; 11.03.2015
comment
@ iPathツ ใช่ฉันค้นหาแล้วและยืนยันการหยุดชะงัก ฉันคาดหวังว่าจะต้องรอด้วยการโทรไปยัง CheckSynchronize หรือที่คล้ายกันเพื่อหลีกเลี่ยงการหยุดชะงัก แต่ไม่มีสิ่งนั้น - person LU RD; 11.03.2015
comment
ขอบคุณสำหรับคำอธิบายที่มีความคิดเห็นครบถ้วนว่าทำไมฉันถึงมีการหยุดชะงักใน TParallel ของฉันสำหรับ sleeped loop ^^ - person Darkendorf; 24.12.2015
comment
@SirRufo คุณได้ ThreadSafeLog กำหนดเป็น procedure TForm1.ThreadSafeLog( ALogMsg: TLogMsg ); จากนั้นต่อมาคุณเรียกมันว่า ThreadSafeLog( 'ParallelCopyFiles - EXIT' ); มันคอมไพล์หรือเปล่า? - person Interface Unknown; 03.07.2017
comment
@InterfaceUnknown ใช่ มันคอมไพล์ได้ - person Sir Rufo; 03.07.2017
comment
@เซอร์รูโฟ ขอบคุณ ฉันอาจจะพลาดอะไรบางอย่างไป... จะได้เรียนรู้โค้ดให้ลึกซึ้งยิ่งขึ้น - person Interface Unknown; 03.07.2017
comment
@InterfaceUnknown ความมหัศจรรย์จะเกิดขึ้นเนื่องจากตัวดำเนินการโดยนัยของ TLogMsg - person Sir Rufo; 03.07.2017