Thread-Safe Blocking Data Queue for Delphi |
|
conundrum
尊榮會員 發表:893 回覆:1272 積分:643 註冊:2004-01-06 發送簡訊給我 |
Thread-Safe Blocking Data Queue for Delphi http://www.undu.com/Articles/990502e.html by K.G. Skaug - k.skaug@satserv.nl In applications which process large amounts of data at high speed, multithreading can help smooth out the load by splitting the required processing in chunks which are handled by dedicated threads. It is a common feature for such threads to pass data to each other for further processing; however, you generally don't want to do this in a synchronous manner because this takes away most of the advantage of multithreading. In order to let the threads operate independently as far as possible, you need data buffers where each thread can fetch and deposit its processing material without worrying about what other threads might be doing at the same time. Let me illustrate with an example: Suppose mr. A, B and C are cooking onion soup. A and B have volunteered to peel the onions, while C cuts the onions and dumps the slices into the boiling water. At regular intervals, one of them (in particular C) has to leave the room to soothe his watering eyes. This severely hampers the overall progress, because if C leaves, A and B can't get rid of their peeled onions; if either A or B leaves, then C has a suboptimal peeled onion supply rate. Also, C can only handle one onion at a time, so if both A and B have a peeled onion ready, one of them will have to wait. Conversely, when he's not busy slicing, C finds himself nagging at A and B to get him more onions. It's one big bottleneck situation. Now here comes mrs. D with a big nice bowl for A and B to put their peeled onions in. Everyone is happy, because no-one has to wait for other people (except for the dinner guests, who still have to wait some time for their soup!). However, we soon run into a new problem: C is so happy with the bowl that he forgets about his watering eyes.He carries on slicing for a good 10 minutes and OOPS - suddenly the bowl is empty! A and B had to go to fetch a new batch of onions in the basement. However, this is where our bowl truly shows its value: Noticing the shock and disappointment on C's face, the bowl mumbles reassuringly: "Don't worry about it, mr. C, just lay back and take a nap, and I'll wake you up when I have another onion for you!". The analogy is accurate, even if this last part perhaps is a bit surrealistic. You guessed it, the "bowl" is my data buffer. Let's leave the world of onion soup for now and take a look at this buffer idea. What I wanted was the following: • A generalised FIFO (First In First Out) buffer • which could be used for virtually any type of data (e.g. onions and potatos in the same bowl) • accessed by virtually any number of threads simultaneously (reading or writing) • automatically block on a read (or write) call to the buffer if the buffer is empty (or full) After some days of experimentation with different techniques I decided on an approach using a combination of critical sections and WaitForMultipleObjects. The resulting class was called TGeneralQueue, although of course I can think of even more generalised schemes. The queue sports the following main features: • A generic thread-safe FIFO data buffer • Maximum item count (queue capacity) determined at run-time. • Dynamic data entry size: the items passed are actually pointers to data blocks which are allocated by the • writing object/thread and deallocated by the reading object/thread. • Additionally, for each pointer a 32-bit integer is passed to indicate the size (in bytes) of the memory block • associated with the queued pointer. Usage of this parameter is not mandatory for the proper functioning of the • queue, so if e.g. fixed or otherwise known-size structures are used, the length tag can be ignored. • Totally safe multithreaded access: Several threads can write to and read from the queue at the same time. The • queue has been tested successfully with 6 parallel accessing threads (3 writers, 3 readers). • Efficient: Access time for single write/single read thread is order of 20us on a P/166 under Windows NT4.0 • Powerful blocking capacity on the read and write functions: Depending on the calling parameters, it is • possible to block the read function if the queue is empty; similarly, the write function can be blocked if the • queue is full. Blocking forces the calling thread to wait (sleep) until the queue is no longer empty or full • (respectively); in this manner it can be guaranteed that the read and write functions always succeed. • If total blocking is not desired (for processing reasons, e.g. if the call comes from the main VCL thread), it is • possible to block for a user-defined timeout (optional). • It is also possible to enforce non-blocking direct access to the queue (still thread-safe); then the calling thread • is responsible for checking the empty/full status of the queue before reading or writing. Note in particular that since it is possible in principle to block infinitely in the queue, you need special handling when you want to destroy the queue; otherwise any threads which are blocked in the queue will burst out and cause nasty errors (imagine someone smashing the onion bowl while C had his head leaned against it...). Therefore a special UnBlockAccess method was added to kick out sleeping threads from the queue before you destroy it. For me, this queue truly unlocked the world of parallel multithreaded data processing. I hope it will be useful to some of you. Source code (Delphi 3) and usage instructions are included. If you have comments, problems, or suggestions for improvement I'd be very glad to hear them! Enjoy! Kristofer Skaug Enclosed: TGeneralQueue Source Code: // // RECOMMENDED USAGE INSTRUCTIONS: // =============================== // // // in Main thread: MyQueue should be a global variable // MyQueue:=TGeneralQueue.Create(Capacity); // // [ now create read and write threads, as many as you want ] // // // in Writing/server thread: // while not Terminated do // begin // [other processing] // dLength:=[length of data block allocated to pointer]; // pData:=AllocMem(dLength); // try // tmp:=MyQueue.WriteItem(pData,dLength,INFINITE); // finally // if (tmp<>BUFF_NO_ERROR) then Terminate; // or somesuch // end; // if not Terminated then // begin ... end;// [more processing] // end; // of Write thread main execution loop // // // in Reading/Client thread: // while not Terminated do // begin // [other processing] // try // tmp:=MyQueue.ReadItem(pData,dLength,INFINITE); // finally // if ((tmp=BUFF_NO_ERROR) and (not Terminated)) then // begin // [do something useful with pData, e.g.:] // CopyMemory(@stringbuffer,pData,dLength); // FreeMem(pData); // This one is VERY IMPORTANT! // // NOTE: FreeMem matches each AllocMem in WriteThread // end // else Terminate; // or somesuch // end; // if not Terminated then // begin ... end;// [more processing] // end; // of Read thread main execution loop // // // in Main Thread, to close down the system: // [Terminate Write Thread] // make sure it's not suspended! // [Terminate Read Thread] // make sure it's not suspended! // MyQueue.UnBlockAccess; // releases WriteThread/ReadThread if blocked // MyQueue.Destroy; // unit GenQueue; interface uses Windows, SysUtils; const MAX_QUEUE_SIZE = 10000; // Maximum possible size of queue (nr. of items); // however actual instantiated size is determined // in run-time by a parameter to the constructor. // ERROR CODES RETURNED BY ADDITEM AND READITEM FUNCTIONS: const BUFF_NO_ERROR = 0; // Function succeeded BUFF_ERR_INVALID_POINTER = -1; // A NULL pointer was received or retrieved BUFF_ERR_CLOSING = -2; // Closedown event aborted block for Add/Read BUFF_ERR_WRITE_TIMEOUT = -4; // Timeout on block in AddItem BUFF_ERR_READ_TIMEOUT = -5; // Timeout on block in ReadItem BUFF_ERR_UNSAFE_WRITE = -6; // Unknown return code from WaitForMultipleObjects/write BUFF_ERR_UNSAFE_READ = -7; // Unknown return code from WaitForMultipleObjects/read BUFF_ERR_WRITE_FULL = -8; // Buffer turned out to be full when written to BUFF_ERR_READ_EMPTY = -9; // Buffer turned out to be empty when read type TQueuedItem = record pData: pointer; // this is the "user data" pointer, created/freed externally dataLength: integer; // this is the size (bytes) of data block in pData (optional usage) end; TQueueArray = array[0..MAX_QUEUE_SIZE] of TQueuedItem; pQueueArray = ^TQueueArray; type TGeneralQueue = class private // Private fields: ReadWriteCS: TRTLCriticalSection; // Protective Critical Section object pCanWrite,pCanRead: PWOHandleArray; // Event object arrays; WriteCount,ReadCount: integer; // "Semaphores" for counting blocked threads WritePointer,ReadPointer: integer; // Slot in pQueue[] to write to/read from next queuedItems: LongInt; // Current number of items in queue maxItems: LongInt; // Max items in queue (determined in Create) pQueue: pQueueArray; // It's a pointer to a variable-size array of TQueuedItem // Private functions/procs: function internalWriteItem(pData:pointer; dataLength:integer): LongInt; function internalReadItem(var pData: pointer; var dataLength: integer): LongInt; procedure internalFlush; public // Here is the public interface: constructor Create(maxItemsInQueue: LongInt); destructor Destroy; override; function WriteItem(pData:pointer; dataLength:integer; TimeOut: dword): LongInt; function ReadItem(var pData: pointer; var dataLength: integer; TimeOut: dword): LongInt; procedure Flush; procedure UnBlockAccess; // to be called only once at end of usage function ItemsInQueue: LongInt; function Full: boolean; function Empty: boolean; end; // class TGeneralQueue declaration implementation //****************************************************************************** // Function Name: TGeneralQueue.Create // Input Parameters: maxItemsInQueue: Queue Capacity; must be >0 // Return Value: None. // Side Effects: .. // Conditions: None. // Description: - Creates Queue object // - Creates internal Critical section object ReadWriteCS // - Initializes internal fields // - Creates internal control events // - Allocates memory for the pQueue pointer according to the // specified maximum number of items in the queue. // Notes: None. //****************************************************************************** constructor TGeneralQueue.Create(maxItemsInQueue: LongInt); begin // Create critical section object: InitializeCriticalSection(ReadWriteCS); // Initializing internal variables: Writepointer:=0; Readpointer:=0; queuedItems:=0; WriteCount:=0; ReadCount:=0; maxItems:=maxItemsInQueue; // Creating Events: pCanRead:=AllocMem(2*sizeof(THandle)); pCanRead[0]:=CreateEvent(nil,False,False,nil); // This is the "not Empty" event; Auto-Reset pCanRead[1]:=CreateEvent(nil,True,False,nil); // This is "Queue Invalidated" event; it's Manual-Reset pCanWrite:=AllocMem(2*sizeof(THandle)); pCanWrite[0]:=CreateEvent(nil,False,False,nil); // Auto-Reset; signaled by default pCanWrite[1]:=CreateEvent(nil,True,False,nil); // This is "Queue Invalidated" event; Manual-reset // Allocating Queue array: pQueue:=AllocMem(maxItems*sizeof(TQueuedItem)); // determines actual size of queue! end; // constructor Create //****************************************************************************** // Function Name: TGeneralQueue.Destroy // Input Parameters: None. // Return Value: None. // Side Effects: Flushes queue // Conditions: None. // Description: Flushes the Queue object, deletes the Critical section // object, Closes the Event handles, frees the pQueue pointer // and destroys the Queue object. // Notes: If the read/write threads have not already been killed, // this destructor should be preceded by a call to the proc // UnBlockAccess so that all threads are released from the // wait functions; otherwise they access an invalid object // when or if they do come out of the wait function. //****************************************************************************** destructor TGeneralQueue.Destroy; begin EnterCriticalSection(ReadWriteCS); try Flush; // must be public Flush not intFlush, otherwise get AV... don't know why! CloseHandle(pCanRead[0]); CloseHandle(pCanRead[1]); // also closes pCanWrite[1] CloseHandle(pCanWrite[0]); CloseHandle(pCanWrite[1]); FreeMem(pCanRead); FreeMem(pCanWrite); FreeMem(pQueue); finally LeaveCriticalSection(ReadWriteCS); end; DeleteCriticalSection(ReadWriteCS); inherited; // destroy object... don't know if this call does anything at all end; // destructor Destroy //****************************************************************************** // Function Name: TGeneralQueue.UnBlockAccess // Input Parameters: None. // Return Value: None. // Side Effects: Any waiting threads in the WaitForMultipleObjects functions // in ReadItem and WriteItem are released and exit the queue. // Conditions: None. // Description: Sets the "Queue Termination" events, which are detected in // the WaitForMultipleObjects functions in WriteItem/ReadItem. // Then enters a loop which exits only when all Read threads // and Write threads have been released from the wait function // Notes: None. //****************************************************************************** procedure TGeneralQueue.UnBlockAccess; var ReadThreadsHanging,WriteThreadsHanging: boolean; begin WriteThreadsHanging:=True; ReadThreadsHanging:=True; SetEvent(pCanWrite[1]); SetEvent(pCanRead[1]); Sleep(10); while WriteThreadsHanging do begin EnterCriticalSection(ReadWriteCS); // avoid AV with Write threads WriteThreadsHanging:=(WriteCount>0); LeaveCriticalSection(ReadWriteCS); Sleep(10); end; while ReadThreadsHanging do begin EnterCriticalSection(ReadWriteCS); // avoid AV with Read threads ReadThreadsHanging:=(ReadCount>0); LeaveCriticalSection(ReadWriteCS); Sleep(10); end; end; // proc UnBlockAccess //****************************************************************************** // Function Name: TGeneralQueue.Flush // Input Parameters: None. // Return Value: None. // Side Effects: None. // Conditions: None. // Description: Public thread-safe interface to internalFlush // Notes: None. //****************************************************************************** procedure TGeneralQueue.Flush; begin EnterCriticalSection(ReadWriteCS); try internalFlush; finally LeaveCriticalSection(ReadWriteCS); end; end; // proc Flush //****************************************************************************** // Function Name: TGeneralQueue.internalFlush // Input Parameters: None. // Return Value: None. // Side Effects: None. // Conditions: None. // Description: Flushes the Queue // Notes: None. //****************************************************************************** procedure TGeneralQueue.internalFlush; var pData: pointer; dLength,tmp: integer; begin while (queuedItems<>0) do begin tmp:=internalReadItem(pData,dLength); FreeMem(pData); pData:=nil; end; end; // proc internalFlush //****************************************************************************** // Function Name: TGeneralQueue.WriteItem // Input Parameters: pData:pointer // dataLength: number of bytes allocated to pData // TimeOut: Length of time (ms) to block if queue is full. // 0: No blocking => caller is responsible for full check // 1-$FFFFFFFE: Timeout period for blocking // INFINITE: Infinite blocking: Guarantees valid return // Return value: QUEUE_ERR_X error code. // Side Effects: None. // Conditions: None. // Description: Adds a pointer to the tail of the Queue. // WAITS for SlotFreeForWrite event if buffer is initially // empty, so that calling thread sleeps (blocking call). // Notes: Public interface to coordinateAddItem->internalAddItem //****************************************************************************** function TGeneralQueue.WriteItem(pData:pointer; dataLength:integer; TimeOut: dword): LongInt; var WaitResult,tmp: LongInt; begin tmp:=BUFF_NO_ERROR; try EnterCriticalSection(ReadWriteCS); while ((queuedItems>=maxItems) and (TimeOut<>0)) do // buffer full, wait for notFull event begin Inc(WriteCount); LeaveCriticalSection(ReadWriteCS); tmp:=BUFF_ERR_UNSAFE_WRITE; // unknown reason for releasing! WaitResult:=WaitForMultipleObjects(2, // number of event objects in pCanWrite pCanWrite,// event objects to wait for False, // either one will release a single thread TimeOut); // timeout to wait for objects EnterCriticalSection(ReadWriteCS); Dec(WriteCount); case WaitResult of WAIT_OBJECT_0: tmp:=BUFF_NO_ERROR; WAIT_OBJECT_0 1: begin tmp:=BUFF_ERR_CLOSING; // the UnBlockThreads() method was called break; // force exit of while loop end; WAIT_TIMEOUT: begin tmp:=BUFF_ERR_WRITE_TIMEOUT; // timeout expired; break; // force exit of while loop end; else break; // force exit of while loop end; // case WaitResult end; // if (do wait) if (tmp=BUFF_NO_ERROR) then tmp:=internalWriteItem(pData,dataLength); finally LeaveCriticalSection(ReadWriteCS); Result:=tmp; end; end; // function WriteItem //****************************************************************************** // Function Name: TGeneralQueue.internalWriteItem // Input Parameters: pData:pointer // dataLength: number of bytes allocated to pData // Side Effects: None. // Conditions: None. // Description: Adds a pointer to the tail of the Queue. // Notes: The object that takes the pointer from the Queue will be // responsible for freeing the memory associated with the // object that the pointer points to! // The calling object is also responsible for assuring that the // Queue isn't full! (Use TQueue.Full) //****************************************************************************** function TGeneralQueue.internalWriteItem(pData:pointer; dataLength:integer): LongInt; var tmp: LongInt; begin tmp:=BUFF_NO_ERROR; try if (queuedItems>=maxItems) then tmp:=BUFF_ERR_WRITE_FULL else if (pData=nil) then tmp:=BUFF_ERR_INVALID_POINTER else begin Inc(WritePointer); if (WritePointer=maxItems) then WritePointer:=0; // writepointer range = 0..maxItems-1 pQueue[WritePointer].pData:=pData; pQueue[WritePointer].dataLength:=dataLength; Inc(queuedItems); ResetEvent(pCanWrite[0]); if (queuedItems>0) then SetEvent(pCanRead[0]); // signals that ONE Read thread can be released from wait end; finally Result:=tmp; end; // try-finally end; // function internalWriteItem //****************************************************************************** // Function Name: TGeneralQueue.ReadItem // Input Parameters: TimeOut: User-specified timeout for blocking function: // 0: No blocking, proceed directly to coordinateReadItem // 1..(INFINITE-1): Timeout interval in ms to block // INFINITE ($FFFFFFFF): Indefinite blocking // Blocking occurs if queue is empty at the time of call. // Return Value: dataLength: number of bytes allocated to pointer // pointer // Side Effects: None. // Conditions: None. // Description: Removes a pointer from the head of the Queue. // WAITS for ItemAvailableForRead event if queue is initially // empty, so that calling thread sleeps (blocking call). // Notes: Public interface for reading from the queue. //****************************************************************************** function TGeneralQueue.ReadItem(var pData:pointer; var dataLength:integer; TimeOut:dword): LongInt; var WaitResult,tmp: LongInt; begin tmp:=BUFF_NO_ERROR; try EnterCriticalSection(ReadWriteCS); while ((queuedItems=0) and (TimeOut<>0)) do // buffer empty, wait for notEmpty event begin Inc(ReadCount); LeaveCriticalSection(ReadWriteCS); tmp:=BUFF_ERR_UNSAFE_READ; // unknown reason for releasing! WaitResult:=WaitForMultipleObjects(2, // number of event objects in pCanWrite pCanRead, // event objects to wait for False, // either one will release a single thread TimeOut); // timeout to wait for objects EnterCriticalSection(ReadWriteCS); Dec(ReadCount); case WaitResult of WAIT_OBJECT_0: tmp:=BUFF_NO_ERROR; // go back to top of loop to check if buffer is empty WAIT_OBJECT_0 1: begin tmp:=BUFF_ERR_CLOSING; // the UnBlockThreads() method was called break; // force exit from while loop end; WAIT_TIMEOUT: begin tmp:=BUFF_ERR_READ_TIMEOUT; // timeout expired; break; // force exit from while loop end; else break; // force exit from while loop end; // case WaitResult end; // if (do wait) if (tmp=BUFF_NO_ERROR) then tmp:=internalReadItem(pData,dataLength); finally LeaveCriticalSection(ReadWriteCS); Result:=tmp; end; end; // function ReadItem //****************************************************************************** // Function Name: TGeneralQueue.internalReadItem // Input Parameters: None. // Output Params: pData: Allocated queued pointer // dataLength: number of bytes allocated to pointer; // Return Value: BUFF_ERR_X Result code // Side Effects: None. // Conditions: None. // Description: Removes a pointer from the head of the queue. // Notes: The receiver of the pointer is responsible for freeing the // memory associated with the object that the pointer points to! // The calling object is also responsible for assuring that the // queue isn't empty! //****************************************************************************** function TGeneralQueue.internalReadItem(var pData: pointer; var dataLength: integer): LongInt; var tmp: LongInt; begin tmp:=BUFF_NO_ERROR; try if (queuedItems=0) then tmp:=BUFF_ERR_READ_EMPTY else begin Inc(ReadPointer); if (ReadPointer=maxItems) then ReadPointer:=0; dataLength:=pQueue[ReadPointer].dataLength; pData:=pQueue[ReadPointer].pData; if (pData=nil) then tmp:=BUFF_ERR_INVALID_POINTER; // should do something more here! Dec(queuedItems); //if (queuedItems |
本站聲明 |
1. 本論壇為無營利行為之開放平台,所有文章都是由網友自行張貼,如牽涉到法律糾紛一切與本站無關。 2. 假如網友發表之內容涉及侵權,而損及您的利益,請立即通知版主刪除。 3. 請勿批評中華民國元首及政府或批評各政黨,是藍是綠本站無權干涉,但這裡不是政治性論壇! |