Bug 162478

Summary: Add SynchronizedFixedQueue class
Product: WebKit Reporter: Said Abou-Hallawa <sabouhallawa>
Component: New BugsAssignee: Said Abou-Hallawa <sabouhallawa>
Status: RESOLVED FIXED    
Severity: Normal CC: benjamin, cdumez, cmarcelo, commit-queue, dbates, fpizlo, ggaren, sam, simon.fraser, thorton
Priority: P2    
Version: WebKit Nightly Build   
Hardware: Unspecified   
OS: Unspecified   
Bug Depends on:    
Bug Blocks: 155322, 155546    
Attachments:
Description Flags
Patch
none
Patch
none
Patch
none
Patch
none
Patch
none
Patch
none
Patch
none
Patch
none
Patch
none
Patch none

Said Abou-Hallawa
Reported 2016-09-22 18:15:50 PDT
The purpose of this class is to synchronize safely enqueuing and dequeuing elements form a FixedQueue. This class can be used in implementing a producer-consumer pattern.
Attachments
Patch (17.49 KB, patch)
2016-09-22 18:30 PDT, Said Abou-Hallawa
no flags
Patch (17.50 KB, patch)
2016-09-22 18:32 PDT, Said Abou-Hallawa
no flags
Patch (33.13 KB, patch)
2016-09-23 12:26 PDT, Said Abou-Hallawa
no flags
Patch (33.12 KB, patch)
2016-09-23 12:57 PDT, Said Abou-Hallawa
no flags
Patch (33.18 KB, patch)
2016-09-23 13:19 PDT, Said Abou-Hallawa
no flags
Patch (26.97 KB, patch)
2016-09-30 17:50 PDT, Said Abou-Hallawa
no flags
Patch (24.18 KB, patch)
2016-10-04 13:27 PDT, Said Abou-Hallawa
no flags
Patch (24.03 KB, patch)
2016-10-05 10:18 PDT, Said Abou-Hallawa
no flags
Patch (22.19 KB, patch)
2016-10-07 10:51 PDT, Said Abou-Hallawa
no flags
Patch (22.21 KB, patch)
2016-10-11 11:40 PDT, Said Abou-Hallawa
no flags
Said Abou-Hallawa
Comment 1 2016-09-22 18:30:01 PDT
Said Abou-Hallawa
Comment 2 2016-09-22 18:32:46 PDT
Alexey Proskuryakov
Comment 3 2016-09-22 20:21:54 PDT
Safe is a very overloaded word, so it doesn't really explain the difference. Would something like SynchronizefSafeQueue be accurate?
Alexey Proskuryakov
Comment 4 2016-09-22 20:22:45 PDT
I meant "Synchronized".
Simon Fraser (smfr)
Comment 5 2016-09-23 10:55:50 PDT
Comment on attachment 289643 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=289643&action=review > Source/WTF/wtf/Broadcast.h:35 > +class Broadcast { I think this name needs to be improved. What is this broadcasting? It seems to be a range? > Source/WTF/wtf/FixedQueue.h:31 > +class FixedQueue { Fixed in what sense? Fixed size? > Source/WTF/wtf/SafeFixedQueue.h:36 > +class SafeFixedQueue : public FixedQueue<T, BufferSize> { Safe in what sense?
Geoffrey Garen
Comment 6 2016-09-23 12:04:55 PDT
Comment on attachment 289643 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=289643&action=review > Source/WTF/wtf/Broadcast.h:102 > + void decrement() > + { > + waitUntilAndNotify([this]() { return m_isShutdown || (m_value > m_minValue && m_value--); }); > + } > + > + void increment() > + { > + waitUntilAndNotify([this]() { return m_isShutdown || (m_value < m_maxValue && ++m_value); }); > + } It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky. > Source/WTF/wtf/FixedQueue.h:44 > + if (isFull()) > + return false; I think this means that image decoding, if it runs faster than image rendering, will throw away new frames instead of old frames. Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure. > Source/WTF/wtf/FixedQueue.h:87 > + size_t count = 1; > + for (size_t head = m_head; head != m_tail; head = (head + 1) % BufferSize) > + ++count; I think you can compute size without linear search. If m_head < m_tail, size is m_tail - m_head. Otherwise, size is distance from m_buffer to m_tail (using %) + distance from m_head (using %) to m_buffer end. Or you can just keep a size counter instead of an m_isEmpty flag. > Source/WTF/wtf/FixedQueue.h:101 > +using WTF::FixedQueue; Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc? Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code.
Said Abou-Hallawa
Comment 7 2016-09-23 12:26:51 PDT
Said Abou-Hallawa
Comment 8 2016-09-23 12:44:04 PDT
Comment on attachment 289643 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=289643&action=review >> Source/WTF/wtf/Broadcast.h:35 >> +class Broadcast { > > I think this name needs to be improved. What is this broadcasting? It seems to be a range? This is what Geoff suggested here: https://bugs.webkit.org/attachment.cgi?id=287822&action=review. But I will try to come up with a better name. >> Source/WTF/wtf/Broadcast.h:102 >> + } > > It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky. I did not understand where the problem is. Is it in how the predicate is written? Or is it in the concept itself? Did not you like having to wait for a predicate and at the same time change the value of a member once the predicate is true? >> Source/WTF/wtf/FixedQueue.h:31 >> +class FixedQueue { > > Fixed in what sense? Fixed size? This is what Geoff suggested here: https://bugs.webkit.org/attachment.cgi?id=287822&action=review. But I will try to come up with a better name. >> Source/WTF/wtf/FixedQueue.h:44 >> + return false; > > I think this means that image decoding, if it runs faster than image rendering, will throw away new frames instead of old frames. > > Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure. I think this can be left to the caller. For the case of dropping the oldest item, the caller can do the following: if (m_queue.isFull()) m_queue.dequeue(); m_queue.enqueue(); >> Source/WTF/wtf/FixedQueue.h:87 >> + ++count; > > I think you can compute size without linear search. If m_head < m_tail, size is m_tail - m_head. Otherwise, size is distance from m_buffer to m_tail (using %) + distance from m_head (using %) to m_buffer end. Or you can just keep a size counter instead of an m_isEmpty flag. Done. >> Source/WTF/wtf/FixedQueue.h:101 >> +using WTF::FixedQueue; > > Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc? > > Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code. This is fine too. But the suggestions so far are just optimizations and the class is not a big deal. For the size() function I have to admit I was lazy writing it because I added it only for testing. >> Source/WTF/wtf/SafeFixedQueue.h:36 >> +class SafeFixedQueue : public FixedQueue<T, BufferSize> { > > Safe in what sense? I changed it to SynchronizedFixedQueue as Alexey suggested in https://bugs.webkit.org/show_bug.cgi?id=162478#c3.
Said Abou-Hallawa
Comment 9 2016-09-23 12:57:11 PDT
Said Abou-Hallawa
Comment 10 2016-09-23 13:19:18 PDT
Simon Fraser (smfr)
Comment 11 2016-09-23 13:25:15 PDT
Comment on attachment 289697 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=289697&action=review > Source/WTF/wtf/Broadcast.h:35 > +class Broadcast { I could call this ThreadsafeCounter or something. > Source/WTF/wtf/Broadcast.h:78 > + m_condition.notifyAll(); Should you notify if the value didn't change? > Source/WTF/wtf/Broadcast.h:94 > + void decrement() I think this and increment() need better names, that indicate their waiting/blocking nature. > Source/WTF/wtf/FixedQueue.h:31 > +class FixedQueue { Why not CircularQueue? > Source/WTF/wtf/FixedQueue.h:90 > + size_t m_head { 0 }; > + size_t m_tail { 0 }; Call these m_headIndex and m_tailIndex. > Source/WTF/wtf/SynchronizedFixedQueue.h:89 > + Broadcast<int> m_empty; > + Broadcast<int> m_full; These names don't tell me what these do. And why do you need both, and not just a single thing that tracks the fullness level?
Geoffrey Garen
Comment 12 2016-09-23 14:13:09 PDT
> > Source/WTF/wtf/Broadcast.h:35 > > +class Broadcast { > > I could call this ThreadsafeCounter or something. Some more options: Watchable<T>, AtomicWatchable<T>, Observable<T>, AtomicObservable<T>. Or, if we like "Counter" in the name: WatchableCounter<T>, AtomicWatchableCounter<T>, ObservableCounter<T>, AtomicObservableCounter<T>. (Note that the class is not only thread safe; it also wakes up other threads to signal its changes.) I suppose, since this is just a counter, there's no point in the template parameter. We can just make the data size_t. The template abstraction would be justified if we removed the explicit increment() and decrement() functions and we just provided functions for "read the data", "wait until the data changes", and "write the data". > > It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky. > I did not understand where the problem is. Is it in how the predicate is written? Or > is it in the concept itself? Did not you like having to wait for a predicate and at > the same time change the value of a member once the predicate is true? The waitUntil function declares that it takes a "predicate" argument. A "predicate" in logic is "something that is affirmed or denied concerning an argument of a proposition". It is very surprising for a function that is supposed to answer a yes/no question to have side-effects, and it is stranger still for those side-effects to potentially change the answer to the yes/no question. Consider this conversation: Me: Are you going to the ballgame? You: Because you asked me that, I have adopted a kitten. Me: ? You: And, now that I own a kitten, I'm too busy to go the ballgame. Me: ??? In fact, I just realized that this is a hard error. Condition variables are allowed to invoke their predicates spuriously, and in this case that would cause multiple increment or multiple decrement. > > Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure. > I think this can be left to the caller. For the case of dropping the oldest item, the caller can do the following: OK, but do we have any callers that want the behavior where new items are discarded? If we're writing a data structure abstraction, it should cater to our use case. > > Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc? > > > > Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code. > This is fine too. But the suggestions so far are just optimizations and the class is not a big deal. For the size() function I have to admit I was lazy writing it because I added it only for testing. Is there any benefit to writing this data structure for ourselves? Put otherwise: What was your motivation in writing this data structure?
Simon Fraser (smfr)
Comment 13 2016-09-23 14:54:50 PDT
Comment on attachment 289700 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=289700&action=review > Source/WTF/wtf/Broadcast.h:96 > + waitUntilAndNotify([this]() { return m_isShutdown || (m_value > m_minValue && m_value--); }); This condition is wrong; m_value-- should check against the min. > Source/WTF/wtf/SynchronizedFixedQueue.h:60 > + m_empty.decrement(); Wait for slot to become available. > Source/WTF/wtf/SynchronizedFixedQueue.h:67 > + m_full.increment(); Notify that a slot has become filled. > Source/WTF/wtf/SynchronizedFixedQueue.h:74 > + m_full.decrement(); Wait for a full slot to be dequeable > Source/WTF/wtf/SynchronizedFixedQueue.h:83 > + m_empty.increment(); Notify that a slot is now empty > Source/WTF/wtf/SynchronizedFixedQueue.h:89 > + Broadcast<int> m_empty; > + Broadcast<int> m_full; Names should reflect these are about empty and full slots.
Geoffrey Garen
Comment 14 2016-09-30 13:27:33 PDT
Comment on attachment 289700 [details] Patch Can SynchronizedFixedQueue just use a single lock and condition variable instead of three? I believe that enqueue can lock, check for shutdown, enqueue, and then broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and then dequeue. That's one lock and condition variable instead of three, and two fewer state variables (m_empty and m_full). Why does SynchronizedFixedQueue::dequeue invoke its functor while holding the queue lock? That means I can't write an algorithm that optionally dequeues a second item because I will deadlock.
Said Abou-Hallawa
Comment 15 2016-09-30 17:50:13 PDT
Said Abou-Hallawa
Comment 16 2016-09-30 18:23:24 PDT
(In reply to comment #14) > Comment on attachment 289700 [details] > Patch > > Can SynchronizedFixedQueue just use a single lock and condition variable > instead of three? > > I believe that enqueue can lock, check for shutdown, enqueue, and then > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > then dequeue. That's one lock and condition variable instead of three, and > two fewer state variables (m_empty and m_full). > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > the queue lock? That means I can't write an algorithm that optionally > dequeues a second item because I will deadlock. I cleaned the patch a little. I removed the FixedQueue template. I made SynchronizedFixedQueue holds to a data member of type Deque instead. I changed SynchronizedFixedQueue::dequeue(), as you suggested, to not hold the lock while processing the item. I also replaced Broadcast by a template class called Watchable<T> and a class called WatchableCounter which is derived from Watchable<size_t>.
Filip Pizlo
Comment 17 2016-10-03 12:09:03 PDT
Comment on attachment 290413 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=290413&action=review > Source/WTF/ChangeLog:17 > + 3. SafeFixedQueue: A fixed-size queue but its enqueue() and dequeue() > + are controlled by two WatchableCounter: m_empty and m_full. As a high-level comment, I think that SafeFixedQueue would look a lot cleaner if it did not use the Watchable/WatchableCounter abstraction. I would have written it directly using a condition variable for empty and a condition variable for full. I think that would have resulted in less code, and so fewer things that could go wrong. On the other hand, I grok this code, so my objection is soft.
Said Abou-Hallawa
Comment 18 2016-10-03 12:32:15 PDT
(In reply to comment #14) > Comment on attachment 289700 [details] > Patch > > Can SynchronizedFixedQueue just use a single lock and condition variable > instead of three? > > I believe that enqueue can lock, check for shutdown, enqueue, and then > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > then dequeue. That's one lock and condition variable instead of three, and > two fewer state variables (m_empty and m_full). > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > the queue lock? That means I can't write an algorithm that optionally > dequeues a second item because I will deadlock. If we use a single lock, that will make each thread waits for the other thread(s) to release the lock. For example, if a producer wants to insert a request in the queue it has to wait for the the consumer thread to finish even if the queue has empty spaces. I think we should make all threads compete for a single lock only when accessing the queue. But to allow two threads to proceed at the same time without getting incorrect value for the queue current count, two additional semaphores have to be used, one for the fullCounter and the other for the emptyCounter. One semaphore is used to block the thread until it can proceed and the other semaphore is used to notify the other thread with completion. Producer: m_emptyCounter.decrement(); // Blocks execution until there is an empty space. { LockHolder lockHolder(m_mutex); m_queue.append(value); } m_fullCounter.increment(); // Notify the other thread that there is an item for processing. Consumer: m_fullCounter.decrement(); // Blocks execution until there is an item to be processed. { LockHolder lockHolder(m_mutex); // Remove the first of the queue. value = m_queue.first(); m_queue.removeFirst(); } m_emptyCounter.increment(); // Notify the other threads that an item was removed from the queue.
Filip Pizlo
Comment 19 2016-10-03 12:47:17 PDT
(In reply to comment #18) > (In reply to comment #14) > > Comment on attachment 289700 [details] > > Patch > > > > Can SynchronizedFixedQueue just use a single lock and condition variable > > instead of three? > > > > I believe that enqueue can lock, check for shutdown, enqueue, and then > > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > > then dequeue. That's one lock and condition variable instead of three, and > > two fewer state variables (m_empty and m_full). > > > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > > the queue lock? That means I can't write an algorithm that optionally > > dequeues a second item because I will deadlock. > > If we use a single lock, that will make each thread waits for the other > thread(s) to release the lock. For example, if a producer wants to insert a > request in the queue it has to wait for the the consumer thread to finish > even if the queue has empty spaces. I think we should make all threads > compete for a single lock only when accessing the queue. I don't think you're getting any extra concurrency over what you'd get with just one lock. The queue lock is the bottleneck either way. > > But to allow two threads to proceed at the same time without getting > incorrect value for the queue current count, two additional semaphores have > to be used, one for the fullCounter and the other for the emptyCounter. One > semaphore is used to block the thread until it can proceed and the other > semaphore is used to notify the other thread with completion. > > Producer: > m_emptyCounter.decrement(); // Blocks execution until there is an empty > space. > { > LockHolder lockHolder(m_mutex); > m_queue.append(value); > } > m_fullCounter.increment(); // Notify the other thread that there is an > item for processing. > > > Consumer: > m_fullCounter.decrement(); // Blocks execution until there is an item to > be processed. > { > LockHolder lockHolder(m_mutex); > // Remove the first of the queue. > value = m_queue.first(); > m_queue.removeFirst(); > } > m_emptyCounter.increment(); // Notify the other threads that an item was > removed from the queue. Or you could just do this: template<typename T> class Queue { public: void enqueue(T value) { LockHolder holder(m_lock); while (m_data.size() >= limit) m_cond.wait(m_lock); m_data.append(value); m_cond.notifyAll(); } T dequeue() { LockHolder locker(m_lock); while (m_data.isEmpty()) m_cond.wait(m_lock); T result = m_data.takeFirst(); m_cond.notifyAll(); return result; } private: Deque<T> m_data; Lock m_lock; Condition m_condition; }; This has one lock, one condition variable, and will probably perform as well as your queue.
Filip Pizlo
Comment 20 2016-10-03 12:49:38 PDT
Comment on attachment 290413 [details] Patch I'm going to mark this r+ because I believe that the code is correct and it does everything that the changelog says it does. I suspect that this could be simplified, but I don't want my suspicion to block your work.
Geoffrey Garen
Comment 21 2016-10-03 13:07:01 PDT
Comment on attachment 290413 [details] Patch Here's a design for enqueue / dequeue that I think would work, with less complexity: (1) enqueue: LockHolder lockHolder(m_mutex); m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size() < BufferSize; }; if (m_isShutDown) return false; m_queue.append(value); m_condition.notifyAll(); return true; (2) dequeue: LockHolder lockHolder(m_mutex); m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size(); }; if (m_isShutDown) return false; value = m_queue.first(); m_queue.removeFirst(); m_condition.notifyAll(); return true;
Geoffrey Garen
Comment 22 2016-10-03 13:10:47 PDT
> If we use a single lock, that will make each thread waits for the other > thread(s) to release the lock. For example, if a producer wants to insert a > request in the queue it has to wait for the the consumer thread to finish > even if the queue has empty spaces. I think we should make all threads > compete for a single lock only when accessing the queue. This comment concerns me and demonstrates a problem in the current design that Simon ran into as well: By using semaphores, the current design implies that enqueue and dequeue can be concurrent, but in fact that isn't true, since they both acquire m_mutex. Perhaps semaphores could be used to achieve an alternative lock-free design, but that isn't what we have here.
JF Bastien
Comment 23 2016-10-03 14:18:38 PDT
Comment on attachment 290413 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=290413&action=review > Source/WTF/wtf/SynchronizedFixedQueue.h:107 > + Deque<T, BufferSize> m_queue; You probably want to separate members onto different cache lines depending on which you expect to be read / written by threads.
Filip Pizlo
Comment 24 2016-10-03 14:34:34 PDT
(In reply to comment #23) > Comment on attachment 290413 [details] > Patch > > View in context: > https://bugs.webkit.org/attachment.cgi?id=290413&action=review > > > Source/WTF/wtf/SynchronizedFixedQueue.h:107 > > + Deque<T, BufferSize> m_queue; > > You probably want to separate members onto different cache lines depending > on which you expect to be read / written by threads. Does that matter if the queue requires a single lock to protect everything?
Filip Pizlo
Comment 25 2016-10-03 14:35:56 PDT
(In reply to comment #22) > > If we use a single lock, that will make each thread waits for the other > > thread(s) to release the lock. For example, if a producer wants to insert a > > request in the queue it has to wait for the the consumer thread to finish > > even if the queue has empty spaces. I think we should make all threads > > compete for a single lock only when accessing the queue. > > This comment concerns me and demonstrates a problem in the current design > that Simon ran into as well: By using semaphores, the current design implies > that enqueue and dequeue can be concurrent, but in fact that isn't true, > since they both acquire m_mutex. > > Perhaps semaphores could be used to achieve an alternative lock-free design, > but that isn't what we have here. If it's using semaphores then it's probably not lock-free. :-P How much perf do we need here? Note that the JSC GC's queueing, which scales to ~8 cores, does not use lock-freedom and protects itself with one lock.
Geoffrey Garen
Comment 26 2016-10-03 15:22:30 PDT
> How much perf do we need here? I don't think this queue is going to be a bottleneck. It guards image decoding. So, the producer does a ~4ms decode and then enqueues and then the consumer does a dequeue while rendering a ~16ms frame. Meanwhile, this little queue takes like 100 microseconds max to lock and signal.
Said Abou-Hallawa
Comment 27 2016-10-04 13:27:08 PDT
Said Abou-Hallawa
Comment 28 2016-10-04 13:31:01 PDT
Comment on attachment 290413 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=290413&action=review >> Source/WTF/ChangeLog:17 >> + are controlled by two WatchableCounter: m_empty and m_full. > > As a high-level comment, I think that SafeFixedQueue would look a lot cleaner if it did not use the Watchable/WatchableCounter abstraction. I would have written it directly using a condition variable for empty and a condition variable for full. I think that would have resulted in less code, and so fewer things that could go wrong. > > On the other hand, I grok this code, so my objection is soft. Watchable and WatchableCounter were removed. The design of SynchronizedFixedQueue is now simpler.It uses a single lock and a single condition to synchronize all the members of the class.
Said Abou-Hallawa
Comment 29 2016-10-04 13:47:09 PDT
(In reply to comment #19) > (In reply to comment #18) > > (In reply to comment #14) > > > Comment on attachment 289700 [details] > > > Patch > > > > > > Can SynchronizedFixedQueue just use a single lock and condition variable > > > instead of three? > > > > > > I believe that enqueue can lock, check for shutdown, enqueue, and then > > > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > > > then dequeue. That's one lock and condition variable instead of three, and > > > two fewer state variables (m_empty and m_full). > > > > > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > > > the queue lock? That means I can't write an algorithm that optionally > > > dequeues a second item because I will deadlock. > > > > If we use a single lock, that will make each thread waits for the other > > thread(s) to release the lock. For example, if a producer wants to insert a > > request in the queue it has to wait for the the consumer thread to finish > > even if the queue has empty spaces. I think we should make all threads > > compete for a single lock only when accessing the queue. > > I don't think you're getting any extra concurrency over what you'd get with > just one lock. The queue lock is the bottleneck either way. > > > > > But to allow two threads to proceed at the same time without getting > > incorrect value for the queue current count, two additional semaphores have > > to be used, one for the fullCounter and the other for the emptyCounter. One > > semaphore is used to block the thread until it can proceed and the other > > semaphore is used to notify the other thread with completion. > > > > Producer: > > m_emptyCounter.decrement(); // Blocks execution until there is an empty > > space. > > { > > LockHolder lockHolder(m_mutex); > > m_queue.append(value); > > } > > m_fullCounter.increment(); // Notify the other thread that there is an > > item for processing. > > > > > > Consumer: > > m_fullCounter.decrement(); // Blocks execution until there is an item to > > be processed. > > { > > LockHolder lockHolder(m_mutex); > > // Remove the first of the queue. > > value = m_queue.first(); > > m_queue.removeFirst(); > > } > > m_emptyCounter.increment(); // Notify the other threads that an item was > > removed from the queue. > > Or you could just do this: > > template<typename T> > class Queue { > public: > void enqueue(T value) > { > LockHolder holder(m_lock); > while (m_data.size() >= limit) > m_cond.wait(m_lock); > m_data.append(value); > m_cond.notifyAll(); > } > > T dequeue() > { > LockHolder locker(m_lock); > while (m_data.isEmpty()) > m_cond.wait(m_lock); > T result = m_data.takeFirst(); > m_cond.notifyAll(); > return result; > } > private: > Deque<T> m_data; > Lock m_lock; > Condition m_condition; > }; > > This has one lock, one condition variable, and will probably perform as well > as your queue. I changed the class to be almost as you suggested expect for one thing: the close() function. I need this function because the image decoding thread has to be alive all the time as long as the image has many frames and it is animating. But I also need to terminate the thread when the Image object itself is deleted. To do that dequeue() has to return a bool saying if it returns because a successful dequeuing or because of closing the queue. And to have a robust close() I needed to track the active working threads to be sure that by the end of close() all the threads are terminated. So I added registerWorker(), unregisterWorker() and isRegisteredWorker(). I keep the worker in a HashSet. The close() function returns only when the size of the HashSet is zero. The question I have is: I use the same condition with different predicates for closing the queue and terminating the workers as well as for enqueuing/dequeuing the elements. This seems to work. But should I create different conditions for different predicates?
Said Abou-Hallawa
Comment 30 2016-10-04 13:49:52 PDT
(In reply to comment #21) > Comment on attachment 290413 [details] > Patch > > Here's a design for enqueue / dequeue that I think would work, with less > complexity: > > (1) enqueue: > > LockHolder lockHolder(m_mutex); > m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size() < > BufferSize; }; > if (m_isShutDown) > return false; > m_queue.append(value); > m_condition.notifyAll(); > return true; > > (2) dequeue: > > LockHolder lockHolder(m_mutex); > m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size(); }; > if (m_isShutDown) > return false; > value = m_queue.first(); > m_queue.removeFirst(); > m_condition.notifyAll(); > return true; You are right. This design is simpler. I implemented it in a edition to few other functions to robustly handle the close() function.
Said Abou-Hallawa
Comment 31 2016-10-05 10:18:08 PDT
Geoffrey Garen
Comment 32 2016-10-06 16:16:29 PDT
Comment on attachment 290722 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=290722&action=review > Source/WTF/wtf/SynchronizedFixedQueue.h:153 > + HashSet<const void*> m_workers; This is not a very good separation of concerns. Tracking a set of worker threads is unrelated to maintaining data elements in a queue. I recommend putting this set of workers into its own data structure. For example, if you're going to use WTF::WorkQueue, that data structure can track outstanding workers and provide API for waiting on them. Another option is to use a separate data structure entirely.
Said Abou-Hallawa
Comment 33 2016-10-07 10:51:08 PDT
Said Abou-Hallawa
Comment 34 2016-10-07 10:58:18 PDT
Comment on attachment 290722 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=290722&action=review >> Source/WTF/wtf/SynchronizedFixedQueue.h:153 >> + HashSet<const void*> m_workers; > > This is not a very good separation of concerns. Tracking a set of worker threads is unrelated to maintaining data elements in a queue. > > I recommend putting this set of workers into its own data structure. For example, if you're going to use WTF::WorkQueue, that data structure can track outstanding workers and provide API for waiting on them. > > Another option is to use a separate data structure entirely. Tracking the workers was removed form the SynchronizedFixedQueue. Instead I am using a BinarySemaphore in caller of SynchronizedFixedQueue::close(). Here is how it is working: void Caller::start() { workQueue()->dispatch([this] { while (m_synchronizedFixedQueue.dequeue(item)) processItem(item); m_closeSemaphore.signal(); } } void Caller::stop() { m_synchronizedFixedQueue.close(); m_closeSemaphore.wait(std::numeric_limits<double>::max()); }
Geoffrey Garen
Comment 35 2016-10-11 10:56:59 PDT
Comment on attachment 290944 [details] Patch View in context: https://bugs.webkit.org/attachment.cgi?id=290944&action=review r=me > Source/WTF/wtf/SynchronizedFixedQueue.h:47 > + void open() > + { > + if (isOpen()) > + return; We should probably lock here. I think the branch on m_open is a race.
Said Abou-Hallawa
Comment 36 2016-10-11 11:40:00 PDT
WebKit Commit Bot
Comment 37 2016-10-11 12:38:12 PDT
Comment on attachment 291279 [details] Patch Clearing flags on attachment: 291279 Committed r207156: <http://trac.webkit.org/changeset/207156>
WebKit Commit Bot
Comment 38 2016-10-11 12:38:20 PDT
All reviewed patches have been landed. Closing bug.
Note You need to log in before you can comment on or make changes to this bug.