WebKit Bugzilla
New
Browse
Log In
×
Sign in with GitHub
or
Remember my login
Create Account
·
Forgot Password
Forgotten password account recovery
RESOLVED FIXED
Bug 162478
Add SynchronizedFixedQueue class
https://bugs.webkit.org/show_bug.cgi?id=162478
Summary
Add SynchronizedFixedQueue class
Said Abou-Hallawa
Reported
2016-09-22 18:15:50 PDT
The purpose of this class is to synchronize safely enqueuing and dequeuing elements form a FixedQueue. This class can be used in implementing a producer-consumer pattern.
Attachments
Patch
(17.49 KB, patch)
2016-09-22 18:30 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(17.50 KB, patch)
2016-09-22 18:32 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(33.13 KB, patch)
2016-09-23 12:26 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(33.12 KB, patch)
2016-09-23 12:57 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(33.18 KB, patch)
2016-09-23 13:19 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(26.97 KB, patch)
2016-09-30 17:50 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(24.18 KB, patch)
2016-10-04 13:27 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(24.03 KB, patch)
2016-10-05 10:18 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(22.19 KB, patch)
2016-10-07 10:51 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Patch
(22.21 KB, patch)
2016-10-11 11:40 PDT
,
Said Abou-Hallawa
no flags
Details
Formatted Diff
Diff
Show Obsolete
(9)
View All
Add attachment
proposed patch, testcase, etc.
Said Abou-Hallawa
Comment 1
2016-09-22 18:30:01 PDT
Created
attachment 289642
[details]
Patch
Said Abou-Hallawa
Comment 2
2016-09-22 18:32:46 PDT
Created
attachment 289643
[details]
Patch
Alexey Proskuryakov
Comment 3
2016-09-22 20:21:54 PDT
Safe is a very overloaded word, so it doesn't really explain the difference. Would something like SynchronizefSafeQueue be accurate?
Alexey Proskuryakov
Comment 4
2016-09-22 20:22:45 PDT
I meant "Synchronized".
Simon Fraser (smfr)
Comment 5
2016-09-23 10:55:50 PDT
Comment on
attachment 289643
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=289643&action=review
> Source/WTF/wtf/Broadcast.h:35 > +class Broadcast {
I think this name needs to be improved. What is this broadcasting? It seems to be a range?
> Source/WTF/wtf/FixedQueue.h:31 > +class FixedQueue {
Fixed in what sense? Fixed size?
> Source/WTF/wtf/SafeFixedQueue.h:36 > +class SafeFixedQueue : public FixedQueue<T, BufferSize> {
Safe in what sense?
Geoffrey Garen
Comment 6
2016-09-23 12:04:55 PDT
Comment on
attachment 289643
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=289643&action=review
> Source/WTF/wtf/Broadcast.h:102 > + void decrement() > + { > + waitUntilAndNotify([this]() { return m_isShutdown || (m_value > m_minValue && m_value--); }); > + } > + > + void increment() > + { > + waitUntilAndNotify([this]() { return m_isShutdown || (m_value < m_maxValue && ++m_value); }); > + }
It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky.
> Source/WTF/wtf/FixedQueue.h:44 > + if (isFull()) > + return false;
I think this means that image decoding, if it runs faster than image rendering, will throw away new frames instead of old frames. Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure.
> Source/WTF/wtf/FixedQueue.h:87 > + size_t count = 1; > + for (size_t head = m_head; head != m_tail; head = (head + 1) % BufferSize) > + ++count;
I think you can compute size without linear search. If m_head < m_tail, size is m_tail - m_head. Otherwise, size is distance from m_buffer to m_tail (using %) + distance from m_head (using %) to m_buffer end. Or you can just keep a size counter instead of an m_isEmpty flag.
> Source/WTF/wtf/FixedQueue.h:101 > +using WTF::FixedQueue;
Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc? Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code.
Said Abou-Hallawa
Comment 7
2016-09-23 12:26:51 PDT
Created
attachment 289695
[details]
Patch
Said Abou-Hallawa
Comment 8
2016-09-23 12:44:04 PDT
Comment on
attachment 289643
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=289643&action=review
>> Source/WTF/wtf/Broadcast.h:35 >> +class Broadcast { > > I think this name needs to be improved. What is this broadcasting? It seems to be a range?
This is what Geoff suggested here:
https://bugs.webkit.org/attachment.cgi?id=287822&action=review
. But I will try to come up with a better name.
>> Source/WTF/wtf/Broadcast.h:102 >> + } > > It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky.
I did not understand where the problem is. Is it in how the predicate is written? Or is it in the concept itself? Did not you like having to wait for a predicate and at the same time change the value of a member once the predicate is true?
>> Source/WTF/wtf/FixedQueue.h:31 >> +class FixedQueue { > > Fixed in what sense? Fixed size?
This is what Geoff suggested here:
https://bugs.webkit.org/attachment.cgi?id=287822&action=review
. But I will try to come up with a better name.
>> Source/WTF/wtf/FixedQueue.h:44 >> + return false; > > I think this means that image decoding, if it runs faster than image rendering, will throw away new frames instead of old frames. > > Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure.
I think this can be left to the caller. For the case of dropping the oldest item, the caller can do the following: if (m_queue.isFull()) m_queue.dequeue(); m_queue.enqueue();
>> Source/WTF/wtf/FixedQueue.h:87 >> + ++count; > > I think you can compute size without linear search. If m_head < m_tail, size is m_tail - m_head. Otherwise, size is distance from m_buffer to m_tail (using %) + distance from m_head (using %) to m_buffer end. Or you can just keep a size counter instead of an m_isEmpty flag.
Done.
>> Source/WTF/wtf/FixedQueue.h:101 >> +using WTF::FixedQueue; > > Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc? > > Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code.
This is fine too. But the suggestions so far are just optimizations and the class is not a big deal. For the size() function I have to admit I was lazy writing it because I added it only for testing.
>> Source/WTF/wtf/SafeFixedQueue.h:36 >> +class SafeFixedQueue : public FixedQueue<T, BufferSize> { > > Safe in what sense?
I changed it to SynchronizedFixedQueue as Alexey suggested in
https://bugs.webkit.org/show_bug.cgi?id=162478#c3
.
Said Abou-Hallawa
Comment 9
2016-09-23 12:57:11 PDT
Created
attachment 289697
[details]
Patch
Said Abou-Hallawa
Comment 10
2016-09-23 13:19:18 PDT
Created
attachment 289700
[details]
Patch
Simon Fraser (smfr)
Comment 11
2016-09-23 13:25:15 PDT
Comment on
attachment 289697
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=289697&action=review
> Source/WTF/wtf/Broadcast.h:35 > +class Broadcast {
I could call this ThreadsafeCounter or something.
> Source/WTF/wtf/Broadcast.h:78 > + m_condition.notifyAll();
Should you notify if the value didn't change?
> Source/WTF/wtf/Broadcast.h:94 > + void decrement()
I think this and increment() need better names, that indicate their waiting/blocking nature.
> Source/WTF/wtf/FixedQueue.h:31 > +class FixedQueue {
Why not CircularQueue?
> Source/WTF/wtf/FixedQueue.h:90 > + size_t m_head { 0 }; > + size_t m_tail { 0 };
Call these m_headIndex and m_tailIndex.
> Source/WTF/wtf/SynchronizedFixedQueue.h:89 > + Broadcast<int> m_empty; > + Broadcast<int> m_full;
These names don't tell me what these do. And why do you need both, and not just a single thing that tracks the fullness level?
Geoffrey Garen
Comment 12
2016-09-23 14:13:09 PDT
> > Source/WTF/wtf/Broadcast.h:35 > > +class Broadcast { > > I could call this ThreadsafeCounter or something.
Some more options: Watchable<T>, AtomicWatchable<T>, Observable<T>, AtomicObservable<T>. Or, if we like "Counter" in the name: WatchableCounter<T>, AtomicWatchableCounter<T>, ObservableCounter<T>, AtomicObservableCounter<T>. (Note that the class is not only thread safe; it also wakes up other threads to signal its changes.) I suppose, since this is just a counter, there's no point in the template parameter. We can just make the data size_t. The template abstraction would be justified if we removed the explicit increment() and decrement() functions and we just provided functions for "read the data", "wait until the data changes", and "write the data".
> > It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky.
> I did not understand where the problem is. Is it in how the predicate is written? Or > is it in the concept itself? Did not you like having to wait for a predicate and at > the same time change the value of a member once the predicate is true?
The waitUntil function declares that it takes a "predicate" argument. A "predicate" in logic is "something that is affirmed or denied concerning an argument of a proposition". It is very surprising for a function that is supposed to answer a yes/no question to have side-effects, and it is stranger still for those side-effects to potentially change the answer to the yes/no question. Consider this conversation: Me: Are you going to the ballgame? You: Because you asked me that, I have adopted a kitten. Me: ? You: And, now that I own a kitten, I'm too busy to go the ballgame. Me: ??? In fact, I just realized that this is a hard error. Condition variables are allowed to invoke their predicates spuriously, and in this case that would cause multiple increment or multiple decrement.
> > Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure.
> I think this can be left to the caller. For the case of dropping the oldest item, the caller can do the following:
OK, but do we have any callers that want the behavior where new items are discarded? If we're writing a data structure abstraction, it should cater to our use case.
> > Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc? > > > > Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code.
> This is fine too. But the suggestions so far are just optimizations and the class is not a big deal. For the size() function I have to admit I was lazy writing it because I added it only for testing.
Is there any benefit to writing this data structure for ourselves? Put otherwise: What was your motivation in writing this data structure?
Simon Fraser (smfr)
Comment 13
2016-09-23 14:54:50 PDT
Comment on
attachment 289700
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=289700&action=review
> Source/WTF/wtf/Broadcast.h:96 > + waitUntilAndNotify([this]() { return m_isShutdown || (m_value > m_minValue && m_value--); });
This condition is wrong; m_value-- should check against the min.
> Source/WTF/wtf/SynchronizedFixedQueue.h:60 > + m_empty.decrement();
Wait for slot to become available.
> Source/WTF/wtf/SynchronizedFixedQueue.h:67 > + m_full.increment();
Notify that a slot has become filled.
> Source/WTF/wtf/SynchronizedFixedQueue.h:74 > + m_full.decrement();
Wait for a full slot to be dequeable
> Source/WTF/wtf/SynchronizedFixedQueue.h:83 > + m_empty.increment();
Notify that a slot is now empty
> Source/WTF/wtf/SynchronizedFixedQueue.h:89 > + Broadcast<int> m_empty; > + Broadcast<int> m_full;
Names should reflect these are about empty and full slots.
Geoffrey Garen
Comment 14
2016-09-30 13:27:33 PDT
Comment on
attachment 289700
[details]
Patch Can SynchronizedFixedQueue just use a single lock and condition variable instead of three? I believe that enqueue can lock, check for shutdown, enqueue, and then broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and then dequeue. That's one lock and condition variable instead of three, and two fewer state variables (m_empty and m_full). Why does SynchronizedFixedQueue::dequeue invoke its functor while holding the queue lock? That means I can't write an algorithm that optionally dequeues a second item because I will deadlock.
Said Abou-Hallawa
Comment 15
2016-09-30 17:50:13 PDT
Created
attachment 290413
[details]
Patch
Said Abou-Hallawa
Comment 16
2016-09-30 18:23:24 PDT
(In reply to
comment #14
)
> Comment on
attachment 289700
[details]
> Patch > > Can SynchronizedFixedQueue just use a single lock and condition variable > instead of three? > > I believe that enqueue can lock, check for shutdown, enqueue, and then > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > then dequeue. That's one lock and condition variable instead of three, and > two fewer state variables (m_empty and m_full). > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > the queue lock? That means I can't write an algorithm that optionally > dequeues a second item because I will deadlock.
I cleaned the patch a little. I removed the FixedQueue template. I made SynchronizedFixedQueue holds to a data member of type Deque instead. I changed SynchronizedFixedQueue::dequeue(), as you suggested, to not hold the lock while processing the item. I also replaced Broadcast by a template class called Watchable<T> and a class called WatchableCounter which is derived from Watchable<size_t>.
Filip Pizlo
Comment 17
2016-10-03 12:09:03 PDT
Comment on
attachment 290413
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=290413&action=review
> Source/WTF/ChangeLog:17 > + 3. SafeFixedQueue: A fixed-size queue but its enqueue() and dequeue() > + are controlled by two WatchableCounter: m_empty and m_full.
As a high-level comment, I think that SafeFixedQueue would look a lot cleaner if it did not use the Watchable/WatchableCounter abstraction. I would have written it directly using a condition variable for empty and a condition variable for full. I think that would have resulted in less code, and so fewer things that could go wrong. On the other hand, I grok this code, so my objection is soft.
Said Abou-Hallawa
Comment 18
2016-10-03 12:32:15 PDT
(In reply to
comment #14
)
> Comment on
attachment 289700
[details]
> Patch > > Can SynchronizedFixedQueue just use a single lock and condition variable > instead of three? > > I believe that enqueue can lock, check for shutdown, enqueue, and then > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > then dequeue. That's one lock and condition variable instead of three, and > two fewer state variables (m_empty and m_full). > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > the queue lock? That means I can't write an algorithm that optionally > dequeues a second item because I will deadlock.
If we use a single lock, that will make each thread waits for the other thread(s) to release the lock. For example, if a producer wants to insert a request in the queue it has to wait for the the consumer thread to finish even if the queue has empty spaces. I think we should make all threads compete for a single lock only when accessing the queue. But to allow two threads to proceed at the same time without getting incorrect value for the queue current count, two additional semaphores have to be used, one for the fullCounter and the other for the emptyCounter. One semaphore is used to block the thread until it can proceed and the other semaphore is used to notify the other thread with completion. Producer: m_emptyCounter.decrement(); // Blocks execution until there is an empty space. { LockHolder lockHolder(m_mutex); m_queue.append(value); } m_fullCounter.increment(); // Notify the other thread that there is an item for processing. Consumer: m_fullCounter.decrement(); // Blocks execution until there is an item to be processed. { LockHolder lockHolder(m_mutex); // Remove the first of the queue. value = m_queue.first(); m_queue.removeFirst(); } m_emptyCounter.increment(); // Notify the other threads that an item was removed from the queue.
Filip Pizlo
Comment 19
2016-10-03 12:47:17 PDT
(In reply to
comment #18
)
> (In reply to
comment #14
) > > Comment on
attachment 289700
[details]
> > Patch > > > > Can SynchronizedFixedQueue just use a single lock and condition variable > > instead of three? > > > > I believe that enqueue can lock, check for shutdown, enqueue, and then > > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > > then dequeue. That's one lock and condition variable instead of three, and > > two fewer state variables (m_empty and m_full). > > > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > > the queue lock? That means I can't write an algorithm that optionally > > dequeues a second item because I will deadlock. > > If we use a single lock, that will make each thread waits for the other > thread(s) to release the lock. For example, if a producer wants to insert a > request in the queue it has to wait for the the consumer thread to finish > even if the queue has empty spaces. I think we should make all threads > compete for a single lock only when accessing the queue.
I don't think you're getting any extra concurrency over what you'd get with just one lock. The queue lock is the bottleneck either way.
> > But to allow two threads to proceed at the same time without getting > incorrect value for the queue current count, two additional semaphores have > to be used, one for the fullCounter and the other for the emptyCounter. One > semaphore is used to block the thread until it can proceed and the other > semaphore is used to notify the other thread with completion. > > Producer: > m_emptyCounter.decrement(); // Blocks execution until there is an empty > space. > { > LockHolder lockHolder(m_mutex); > m_queue.append(value); > } > m_fullCounter.increment(); // Notify the other thread that there is an > item for processing. > > > Consumer: > m_fullCounter.decrement(); // Blocks execution until there is an item to > be processed. > { > LockHolder lockHolder(m_mutex); > // Remove the first of the queue. > value = m_queue.first(); > m_queue.removeFirst(); > } > m_emptyCounter.increment(); // Notify the other threads that an item was > removed from the queue.
Or you could just do this: template<typename T> class Queue { public: void enqueue(T value) { LockHolder holder(m_lock); while (m_data.size() >= limit) m_cond.wait(m_lock); m_data.append(value); m_cond.notifyAll(); } T dequeue() { LockHolder locker(m_lock); while (m_data.isEmpty()) m_cond.wait(m_lock); T result = m_data.takeFirst(); m_cond.notifyAll(); return result; } private: Deque<T> m_data; Lock m_lock; Condition m_condition; }; This has one lock, one condition variable, and will probably perform as well as your queue.
Filip Pizlo
Comment 20
2016-10-03 12:49:38 PDT
Comment on
attachment 290413
[details]
Patch I'm going to mark this r+ because I believe that the code is correct and it does everything that the changelog says it does. I suspect that this could be simplified, but I don't want my suspicion to block your work.
Geoffrey Garen
Comment 21
2016-10-03 13:07:01 PDT
Comment on
attachment 290413
[details]
Patch Here's a design for enqueue / dequeue that I think would work, with less complexity: (1) enqueue: LockHolder lockHolder(m_mutex); m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size() < BufferSize; }; if (m_isShutDown) return false; m_queue.append(value); m_condition.notifyAll(); return true; (2) dequeue: LockHolder lockHolder(m_mutex); m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size(); }; if (m_isShutDown) return false; value = m_queue.first(); m_queue.removeFirst(); m_condition.notifyAll(); return true;
Geoffrey Garen
Comment 22
2016-10-03 13:10:47 PDT
> If we use a single lock, that will make each thread waits for the other > thread(s) to release the lock. For example, if a producer wants to insert a > request in the queue it has to wait for the the consumer thread to finish > even if the queue has empty spaces. I think we should make all threads > compete for a single lock only when accessing the queue.
This comment concerns me and demonstrates a problem in the current design that Simon ran into as well: By using semaphores, the current design implies that enqueue and dequeue can be concurrent, but in fact that isn't true, since they both acquire m_mutex. Perhaps semaphores could be used to achieve an alternative lock-free design, but that isn't what we have here.
JF Bastien
Comment 23
2016-10-03 14:18:38 PDT
Comment on
attachment 290413
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=290413&action=review
> Source/WTF/wtf/SynchronizedFixedQueue.h:107 > + Deque<T, BufferSize> m_queue;
You probably want to separate members onto different cache lines depending on which you expect to be read / written by threads.
Filip Pizlo
Comment 24
2016-10-03 14:34:34 PDT
(In reply to
comment #23
)
> Comment on
attachment 290413
[details]
> Patch > > View in context: >
https://bugs.webkit.org/attachment.cgi?id=290413&action=review
> > > Source/WTF/wtf/SynchronizedFixedQueue.h:107 > > + Deque<T, BufferSize> m_queue; > > You probably want to separate members onto different cache lines depending > on which you expect to be read / written by threads.
Does that matter if the queue requires a single lock to protect everything?
Filip Pizlo
Comment 25
2016-10-03 14:35:56 PDT
(In reply to
comment #22
)
> > If we use a single lock, that will make each thread waits for the other > > thread(s) to release the lock. For example, if a producer wants to insert a > > request in the queue it has to wait for the the consumer thread to finish > > even if the queue has empty spaces. I think we should make all threads > > compete for a single lock only when accessing the queue. > > This comment concerns me and demonstrates a problem in the current design > that Simon ran into as well: By using semaphores, the current design implies > that enqueue and dequeue can be concurrent, but in fact that isn't true, > since they both acquire m_mutex. > > Perhaps semaphores could be used to achieve an alternative lock-free design, > but that isn't what we have here.
If it's using semaphores then it's probably not lock-free. :-P How much perf do we need here? Note that the JSC GC's queueing, which scales to ~8 cores, does not use lock-freedom and protects itself with one lock.
Geoffrey Garen
Comment 26
2016-10-03 15:22:30 PDT
> How much perf do we need here?
I don't think this queue is going to be a bottleneck. It guards image decoding. So, the producer does a ~4ms decode and then enqueues and then the consumer does a dequeue while rendering a ~16ms frame. Meanwhile, this little queue takes like 100 microseconds max to lock and signal.
Said Abou-Hallawa
Comment 27
2016-10-04 13:27:08 PDT
Created
attachment 290638
[details]
Patch
Said Abou-Hallawa
Comment 28
2016-10-04 13:31:01 PDT
Comment on
attachment 290413
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=290413&action=review
>> Source/WTF/ChangeLog:17 >> + are controlled by two WatchableCounter: m_empty and m_full. > > As a high-level comment, I think that SafeFixedQueue would look a lot cleaner if it did not use the Watchable/WatchableCounter abstraction. I would have written it directly using a condition variable for empty and a condition variable for full. I think that would have resulted in less code, and so fewer things that could go wrong. > > On the other hand, I grok this code, so my objection is soft.
Watchable and WatchableCounter were removed. The design of SynchronizedFixedQueue is now simpler.It uses a single lock and a single condition to synchronize all the members of the class.
Said Abou-Hallawa
Comment 29
2016-10-04 13:47:09 PDT
(In reply to
comment #19
)
> (In reply to
comment #18
) > > (In reply to
comment #14
) > > > Comment on
attachment 289700
[details]
> > > Patch > > > > > > Can SynchronizedFixedQueue just use a single lock and condition variable > > > instead of three? > > > > > > I believe that enqueue can lock, check for shutdown, enqueue, and then > > > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and > > > then dequeue. That's one lock and condition variable instead of three, and > > > two fewer state variables (m_empty and m_full). > > > > > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding > > > the queue lock? That means I can't write an algorithm that optionally > > > dequeues a second item because I will deadlock. > > > > If we use a single lock, that will make each thread waits for the other > > thread(s) to release the lock. For example, if a producer wants to insert a > > request in the queue it has to wait for the the consumer thread to finish > > even if the queue has empty spaces. I think we should make all threads > > compete for a single lock only when accessing the queue. > > I don't think you're getting any extra concurrency over what you'd get with > just one lock. The queue lock is the bottleneck either way. > > > > > But to allow two threads to proceed at the same time without getting > > incorrect value for the queue current count, two additional semaphores have > > to be used, one for the fullCounter and the other for the emptyCounter. One > > semaphore is used to block the thread until it can proceed and the other > > semaphore is used to notify the other thread with completion. > > > > Producer: > > m_emptyCounter.decrement(); // Blocks execution until there is an empty > > space. > > { > > LockHolder lockHolder(m_mutex); > > m_queue.append(value); > > } > > m_fullCounter.increment(); // Notify the other thread that there is an > > item for processing. > > > > > > Consumer: > > m_fullCounter.decrement(); // Blocks execution until there is an item to > > be processed. > > { > > LockHolder lockHolder(m_mutex); > > // Remove the first of the queue. > > value = m_queue.first(); > > m_queue.removeFirst(); > > } > > m_emptyCounter.increment(); // Notify the other threads that an item was > > removed from the queue. > > Or you could just do this: > > template<typename T> > class Queue { > public: > void enqueue(T value) > { > LockHolder holder(m_lock); > while (m_data.size() >= limit) > m_cond.wait(m_lock); > m_data.append(value); > m_cond.notifyAll(); > } > > T dequeue() > { > LockHolder locker(m_lock); > while (m_data.isEmpty()) > m_cond.wait(m_lock); > T result = m_data.takeFirst(); > m_cond.notifyAll(); > return result; > } > private: > Deque<T> m_data; > Lock m_lock; > Condition m_condition; > }; > > This has one lock, one condition variable, and will probably perform as well > as your queue.
I changed the class to be almost as you suggested expect for one thing: the close() function. I need this function because the image decoding thread has to be alive all the time as long as the image has many frames and it is animating. But I also need to terminate the thread when the Image object itself is deleted. To do that dequeue() has to return a bool saying if it returns because a successful dequeuing or because of closing the queue. And to have a robust close() I needed to track the active working threads to be sure that by the end of close() all the threads are terminated. So I added registerWorker(), unregisterWorker() and isRegisteredWorker(). I keep the worker in a HashSet. The close() function returns only when the size of the HashSet is zero. The question I have is: I use the same condition with different predicates for closing the queue and terminating the workers as well as for enqueuing/dequeuing the elements. This seems to work. But should I create different conditions for different predicates?
Said Abou-Hallawa
Comment 30
2016-10-04 13:49:52 PDT
(In reply to
comment #21
)
> Comment on
attachment 290413
[details]
> Patch > > Here's a design for enqueue / dequeue that I think would work, with less > complexity: > > (1) enqueue: > > LockHolder lockHolder(m_mutex); > m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size() < > BufferSize; }; > if (m_isShutDown) > return false; > m_queue.append(value); > m_condition.notifyAll(); > return true; > > (2) dequeue: > > LockHolder lockHolder(m_mutex); > m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size(); }; > if (m_isShutDown) > return false; > value = m_queue.first(); > m_queue.removeFirst(); > m_condition.notifyAll(); > return true;
You are right. This design is simpler. I implemented it in a edition to few other functions to robustly handle the close() function.
Said Abou-Hallawa
Comment 31
2016-10-05 10:18:08 PDT
Created
attachment 290722
[details]
Patch
Geoffrey Garen
Comment 32
2016-10-06 16:16:29 PDT
Comment on
attachment 290722
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=290722&action=review
> Source/WTF/wtf/SynchronizedFixedQueue.h:153 > + HashSet<const void*> m_workers;
This is not a very good separation of concerns. Tracking a set of worker threads is unrelated to maintaining data elements in a queue. I recommend putting this set of workers into its own data structure. For example, if you're going to use WTF::WorkQueue, that data structure can track outstanding workers and provide API for waiting on them. Another option is to use a separate data structure entirely.
Said Abou-Hallawa
Comment 33
2016-10-07 10:51:08 PDT
Created
attachment 290944
[details]
Patch
Said Abou-Hallawa
Comment 34
2016-10-07 10:58:18 PDT
Comment on
attachment 290722
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=290722&action=review
>> Source/WTF/wtf/SynchronizedFixedQueue.h:153 >> + HashSet<const void*> m_workers; > > This is not a very good separation of concerns. Tracking a set of worker threads is unrelated to maintaining data elements in a queue. > > I recommend putting this set of workers into its own data structure. For example, if you're going to use WTF::WorkQueue, that data structure can track outstanding workers and provide API for waiting on them. > > Another option is to use a separate data structure entirely.
Tracking the workers was removed form the SynchronizedFixedQueue. Instead I am using a BinarySemaphore in caller of SynchronizedFixedQueue::close(). Here is how it is working: void Caller::start() { workQueue()->dispatch([this] { while (m_synchronizedFixedQueue.dequeue(item)) processItem(item); m_closeSemaphore.signal(); } } void Caller::stop() { m_synchronizedFixedQueue.close(); m_closeSemaphore.wait(std::numeric_limits<double>::max()); }
Geoffrey Garen
Comment 35
2016-10-11 10:56:59 PDT
Comment on
attachment 290944
[details]
Patch View in context:
https://bugs.webkit.org/attachment.cgi?id=290944&action=review
r=me
> Source/WTF/wtf/SynchronizedFixedQueue.h:47 > + void open() > + { > + if (isOpen()) > + return;
We should probably lock here. I think the branch on m_open is a race.
Said Abou-Hallawa
Comment 36
2016-10-11 11:40:00 PDT
Created
attachment 291279
[details]
Patch
WebKit Commit Bot
Comment 37
2016-10-11 12:38:12 PDT
Comment on
attachment 291279
[details]
Patch Clearing flags on attachment: 291279 Committed
r207156
: <
http://trac.webkit.org/changeset/207156
>
WebKit Commit Bot
Comment 38
2016-10-11 12:38:20 PDT
All reviewed patches have been landed. Closing bug.
Note
You need to
log in
before you can comment on or make changes to this bug.
Top of Page
Format For Printing
XML
Clone This Bug