Bug 162478 - Add SynchronizedFixedQueue class
Summary: Add SynchronizedFixedQueue class
Status: RESOLVED FIXED
Alias: None
Product: WebKit
Classification: Unclassified
Component: New Bugs (show other bugs)
Version: WebKit Nightly Build
Hardware: Unspecified Unspecified
: P2 Normal
Assignee: Said Abou-Hallawa
URL:
Keywords:
Depends on:
Blocks: 155322 155546
  Show dependency treegraph
 
Reported: 2016-09-22 18:15 PDT by Said Abou-Hallawa
Modified: 2016-10-11 12:38 PDT (History)
10 users (show)

See Also:


Attachments
Patch (17.49 KB, patch)
2016-09-22 18:30 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (17.50 KB, patch)
2016-09-22 18:32 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (33.13 KB, patch)
2016-09-23 12:26 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (33.12 KB, patch)
2016-09-23 12:57 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (33.18 KB, patch)
2016-09-23 13:19 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (26.97 KB, patch)
2016-09-30 17:50 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (24.18 KB, patch)
2016-10-04 13:27 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (24.03 KB, patch)
2016-10-05 10:18 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (22.19 KB, patch)
2016-10-07 10:51 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff
Patch (22.21 KB, patch)
2016-10-11 11:40 PDT, Said Abou-Hallawa
no flags Details | Formatted Diff | Diff

Note You need to log in before you can comment on or make changes to this bug.
Description Said Abou-Hallawa 2016-09-22 18:15:50 PDT
The purpose of this class is to synchronize safely enqueuing and dequeuing elements form a FixedQueue. This class can be used in implementing a producer-consumer pattern.
Comment 1 Said Abou-Hallawa 2016-09-22 18:30:01 PDT
Created attachment 289642 [details]
Patch
Comment 2 Said Abou-Hallawa 2016-09-22 18:32:46 PDT
Created attachment 289643 [details]
Patch
Comment 3 Alexey Proskuryakov 2016-09-22 20:21:54 PDT
Safe is a very overloaded word, so it doesn't really explain the difference. Would something like SynchronizefSafeQueue be accurate?
Comment 4 Alexey Proskuryakov 2016-09-22 20:22:45 PDT
I meant "Synchronized".
Comment 5 Simon Fraser (smfr) 2016-09-23 10:55:50 PDT
Comment on attachment 289643 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=289643&action=review

> Source/WTF/wtf/Broadcast.h:35
> +class Broadcast {

I think this name needs to be improved. What is this broadcasting? It seems to be a range?

> Source/WTF/wtf/FixedQueue.h:31
> +class FixedQueue {

Fixed in what sense? Fixed size?

> Source/WTF/wtf/SafeFixedQueue.h:36
> +class SafeFixedQueue : public FixedQueue<T, BufferSize> {

Safe in what sense?
Comment 6 Geoffrey Garen 2016-09-23 12:04:55 PDT
Comment on attachment 289643 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=289643&action=review

> Source/WTF/wtf/Broadcast.h:102
> +    void decrement()
> +    {
> +        waitUntilAndNotify([this]() { return m_isShutdown || (m_value > m_minValue && m_value--); });
> +    }
> +
> +    void increment()
> +    {
> +        waitUntilAndNotify([this]() { return m_isShutdown || (m_value < m_maxValue && ++m_value); });
> +    }

It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky.

> Source/WTF/wtf/FixedQueue.h:44
> +        if (isFull())
> +            return false;

I think this means that image decoding, if it runs faster than image rendering, will throw away new frames instead of old frames.

Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure.

> Source/WTF/wtf/FixedQueue.h:87
> +        size_t count = 1;
> +        for (size_t head = m_head; head != m_tail; head = (head + 1) % BufferSize)
> +            ++count;

I think you can compute size without linear search. If m_head < m_tail, size is m_tail - m_head. Otherwise, size is distance from m_buffer to m_tail (using %) + distance from m_head (using %) to m_buffer end. Or you can just keep a size counter instead of an m_isEmpty flag.

> Source/WTF/wtf/FixedQueue.h:101
> +using WTF::FixedQueue;

Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc?

Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code.
Comment 7 Said Abou-Hallawa 2016-09-23 12:26:51 PDT
Created attachment 289695 [details]
Patch
Comment 8 Said Abou-Hallawa 2016-09-23 12:44:04 PDT
Comment on attachment 289643 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=289643&action=review

>> Source/WTF/wtf/Broadcast.h:35
>> +class Broadcast {
> 
> I think this name needs to be improved. What is this broadcasting? It seems to be a range?

This is what Geoff suggested here: https://bugs.webkit.org/attachment.cgi?id=287822&action=review. But I will try to come up with a better name.

>> Source/WTF/wtf/Broadcast.h:102
>> +    }
> 
> It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky.

I did not understand where the problem is. Is it in how the predicate is written? Or is it in the concept itself? Did not you like having to wait for a predicate and at the same time change the value of a member once the predicate is true?

>> Source/WTF/wtf/FixedQueue.h:31
>> +class FixedQueue {
> 
> Fixed in what sense? Fixed size?

This is what Geoff suggested here: https://bugs.webkit.org/attachment.cgi?id=287822&action=review. But I will try to come up with a better name.

>> Source/WTF/wtf/FixedQueue.h:44
>> +            return false;
> 
> I think this means that image decoding, if it runs faster than image rendering, will throw away new frames instead of old frames.
> 
> Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure.

I think this can be left to the caller. For the case of dropping the oldest item, the caller can do the following:

if (m_queue.isFull())
    m_queue.dequeue();

m_queue.enqueue();

>> Source/WTF/wtf/FixedQueue.h:87
>> +            ++count;
> 
> I think you can compute size without linear search. If m_head < m_tail, size is m_tail - m_head. Otherwise, size is distance from m_buffer to m_tail (using %) + distance from m_head (using %) to m_buffer end. Or you can just keep a size counter instead of an m_isEmpty flag.

Done.

>> Source/WTF/wtf/FixedQueue.h:101
>> +using WTF::FixedQueue;
> 
> Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc?
> 
> Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code.

This is fine too. But the suggestions so far are just optimizations and the class is not a big deal. For the size() function I have to admit I was lazy writing it because I added it only for testing.

>> Source/WTF/wtf/SafeFixedQueue.h:36
>> +class SafeFixedQueue : public FixedQueue<T, BufferSize> {
> 
> Safe in what sense?

I changed it to SynchronizedFixedQueue as Alexey suggested in https://bugs.webkit.org/show_bug.cgi?id=162478#c3.
Comment 9 Said Abou-Hallawa 2016-09-23 12:57:11 PDT
Created attachment 289697 [details]
Patch
Comment 10 Said Abou-Hallawa 2016-09-23 13:19:18 PDT
Created attachment 289700 [details]
Patch
Comment 11 Simon Fraser (smfr) 2016-09-23 13:25:15 PDT
Comment on attachment 289697 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=289697&action=review

> Source/WTF/wtf/Broadcast.h:35
> +class Broadcast {

I could call this ThreadsafeCounter or something.

> Source/WTF/wtf/Broadcast.h:78
> +        m_condition.notifyAll();

Should you notify if the value didn't change?

> Source/WTF/wtf/Broadcast.h:94
> +    void decrement()

I think this and increment() need better names, that indicate their waiting/blocking nature.

> Source/WTF/wtf/FixedQueue.h:31
> +class FixedQueue {

Why not CircularQueue?

> Source/WTF/wtf/FixedQueue.h:90
> +    size_t m_head { 0 };
> +    size_t m_tail { 0 };

Call these m_headIndex and m_tailIndex.

> Source/WTF/wtf/SynchronizedFixedQueue.h:89
> +    Broadcast<int> m_empty;
> +    Broadcast<int> m_full;

These names don't tell me what these do. And why do you need both, and not just a single thing that tracks the fullness level?
Comment 12 Geoffrey Garen 2016-09-23 14:13:09 PDT
> > Source/WTF/wtf/Broadcast.h:35
> > +class Broadcast {
> 
> I could call this ThreadsafeCounter or something.

Some more options:

Watchable<T>, AtomicWatchable<T>, Observable<T>, AtomicObservable<T>.

Or, if we like "Counter" in the name:

WatchableCounter<T>, AtomicWatchableCounter<T>, ObservableCounter<T>, AtomicObservableCounter<T>.

(Note that the class is not only thread safe; it also wakes up other threads to signal its changes.)

I suppose, since this is just a counter, there's no point in the template parameter. We can just make the data size_t.

The template abstraction would be justified if we removed the explicit increment() and decrement() functions and we just provided functions for "read the data", "wait until the data changes", and "write the data".

> > It's very sneaky to disguise a state change as a condition variable predicate. Probably too sneaky.

> I did not understand where the problem is. Is it in how the predicate is written? Or
> is it in the concept itself? Did not you like having to wait for a predicate and at
> the same time change the value of a member once the predicate is true?

The waitUntil function declares that it takes a "predicate" argument. A "predicate" in logic is "something that is affirmed or denied concerning an argument of a proposition". It is very surprising for a function that is supposed to answer a yes/no question to have side-effects, and it is stranger still for those side-effects to potentially change the answer to the yes/no question.

Consider this conversation:

Me: Are you going to the ballgame?

You: Because you asked me that, I have adopted a kitten.

Me: ?

You: And, now that I own a kitten, I'm too busy to go the ballgame.

Me: ???

In fact, I just realized that this is a hard error. Condition variables are allowed to invoke their predicates spuriously, and in this case that would cause multiple increment or multiple decrement.

> > Is that the behavior we want? My understanding is that most graphics systems skip old frames when under time pressure.

> I think this can be left to the caller. For the case of dropping the oldest item, the caller can do the following:

OK, but do we have any callers that want the behavior where new items are discarded? If we're writing a data structure abstraction, it should cater to our use case.

> > Do we benefit specifically from a fixed-sized buffer, or should we just use std::queue with a custom allocator that calls WTF::fastMalloc?
> > 
> > Part of why I'm asking is that this is the second time I think I've spotted some problems in this class, so I see some potential benefit from reusing known-good code.

> This is fine too. But the suggestions so far are just optimizations and the class is not a big deal. For the size() function I have to admit I was lazy writing it because I added it only for testing.

Is there any benefit to writing this data structure for ourselves?

Put otherwise: What was your motivation in writing this data structure?
Comment 13 Simon Fraser (smfr) 2016-09-23 14:54:50 PDT
Comment on attachment 289700 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=289700&action=review

> Source/WTF/wtf/Broadcast.h:96
> +        waitUntilAndNotify([this]() { return m_isShutdown || (m_value > m_minValue && m_value--); });

This condition is wrong; m_value-- should check against the min.

> Source/WTF/wtf/SynchronizedFixedQueue.h:60
> +        m_empty.decrement();

Wait for slot to become available.

> Source/WTF/wtf/SynchronizedFixedQueue.h:67
> +        m_full.increment();

Notify that a slot has become filled.

> Source/WTF/wtf/SynchronizedFixedQueue.h:74
> +        m_full.decrement();

Wait for a full slot to be dequeable

> Source/WTF/wtf/SynchronizedFixedQueue.h:83
> +        m_empty.increment();

Notify that a slot is now empty

> Source/WTF/wtf/SynchronizedFixedQueue.h:89
> +    Broadcast<int> m_empty;
> +    Broadcast<int> m_full;

Names should reflect these are about empty and full slots.
Comment 14 Geoffrey Garen 2016-09-30 13:27:33 PDT
Comment on attachment 289700 [details]
Patch

Can SynchronizedFixedQueue just use a single lock and condition variable instead of three?

I believe that enqueue can lock, check for shutdown, enqueue, and then broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and then dequeue. That's one lock and condition variable instead of three, and two fewer state variables (m_empty and m_full).

Why does SynchronizedFixedQueue::dequeue invoke its functor while holding the queue lock? That means I can't write an algorithm that optionally dequeues a second item because I will deadlock.
Comment 15 Said Abou-Hallawa 2016-09-30 17:50:13 PDT
Created attachment 290413 [details]
Patch
Comment 16 Said Abou-Hallawa 2016-09-30 18:23:24 PDT
(In reply to comment #14)
> Comment on attachment 289700 [details]
> Patch
> 
> Can SynchronizedFixedQueue just use a single lock and condition variable
> instead of three?
> 
> I believe that enqueue can lock, check for shutdown, enqueue, and then
> broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and
> then dequeue. That's one lock and condition variable instead of three, and
> two fewer state variables (m_empty and m_full).
> 
> Why does SynchronizedFixedQueue::dequeue invoke its functor while holding
> the queue lock? That means I can't write an algorithm that optionally
> dequeues a second item because I will deadlock.

I cleaned the patch a little. I removed the FixedQueue template. I made SynchronizedFixedQueue holds to a data member of type Deque instead. I changed SynchronizedFixedQueue::dequeue(), as you suggested, to not hold the lock while processing the item. I also replaced Broadcast by a template class called Watchable<T> and a class called WatchableCounter which is derived from Watchable<size_t>.
Comment 17 Filip Pizlo 2016-10-03 12:09:03 PDT
Comment on attachment 290413 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=290413&action=review

> Source/WTF/ChangeLog:17
> +        3. SafeFixedQueue: A fixed-size queue but its enqueue() and dequeue()
> +           are controlled by two WatchableCounter: m_empty and m_full.

As a high-level comment, I think that SafeFixedQueue would look a lot cleaner if it did not use the Watchable/WatchableCounter abstraction.  I would have written it directly using a condition variable for empty and a condition variable for full.  I think that would have resulted in less code, and so fewer things that could go wrong.

On the other hand, I grok this code, so my objection is soft.
Comment 18 Said Abou-Hallawa 2016-10-03 12:32:15 PDT
(In reply to comment #14)
> Comment on attachment 289700 [details]
> Patch
> 
> Can SynchronizedFixedQueue just use a single lock and condition variable
> instead of three?
> 
> I believe that enqueue can lock, check for shutdown, enqueue, and then
> broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and
> then dequeue. That's one lock and condition variable instead of three, and
> two fewer state variables (m_empty and m_full).
> 
> Why does SynchronizedFixedQueue::dequeue invoke its functor while holding
> the queue lock? That means I can't write an algorithm that optionally
> dequeues a second item because I will deadlock.

If we use a single lock, that will make each thread waits for the other thread(s) to release the lock. For example, if a producer wants to insert a request in the queue it has to wait for the the consumer thread to finish even if the queue has empty spaces. I think we should make all threads compete for a single lock only when accessing the queue.

But to allow two threads to proceed at the same time without getting incorrect value for the queue current count, two additional semaphores have to be used, one for the fullCounter and the other for the emptyCounter. One semaphore is used to block the thread until it can proceed and the other semaphore is used to notify the other thread with completion.

Producer:
    m_emptyCounter.decrement(); // Blocks execution until there is an empty space.
    {
        LockHolder lockHolder(m_mutex);
        m_queue.append(value);
    }
    m_fullCounter.increment(); // Notify the other thread that there is an item for processing.


Consumer:
    m_fullCounter.decrement(); // Blocks execution until there is an item to be processed.
    {
        LockHolder lockHolder(m_mutex);
        // Remove the first of the queue.
        value = m_queue.first();
        m_queue.removeFirst();
    }
    m_emptyCounter.increment(); // Notify the other threads that an item was removed from the queue.
Comment 19 Filip Pizlo 2016-10-03 12:47:17 PDT
(In reply to comment #18)
> (In reply to comment #14)
> > Comment on attachment 289700 [details]
> > Patch
> > 
> > Can SynchronizedFixedQueue just use a single lock and condition variable
> > instead of three?
> > 
> > I believe that enqueue can lock, check for shutdown, enqueue, and then
> > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and
> > then dequeue. That's one lock and condition variable instead of three, and
> > two fewer state variables (m_empty and m_full).
> > 
> > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding
> > the queue lock? That means I can't write an algorithm that optionally
> > dequeues a second item because I will deadlock.
> 
> If we use a single lock, that will make each thread waits for the other
> thread(s) to release the lock. For example, if a producer wants to insert a
> request in the queue it has to wait for the the consumer thread to finish
> even if the queue has empty spaces. I think we should make all threads
> compete for a single lock only when accessing the queue.

I don't think you're getting any extra concurrency over what you'd get with just one lock.  The queue lock is the bottleneck either way.

> 
> But to allow two threads to proceed at the same time without getting
> incorrect value for the queue current count, two additional semaphores have
> to be used, one for the fullCounter and the other for the emptyCounter. One
> semaphore is used to block the thread until it can proceed and the other
> semaphore is used to notify the other thread with completion.
> 
> Producer:
>     m_emptyCounter.decrement(); // Blocks execution until there is an empty
> space.
>     {
>         LockHolder lockHolder(m_mutex);
>         m_queue.append(value);
>     }
>     m_fullCounter.increment(); // Notify the other thread that there is an
> item for processing.
> 
> 
> Consumer:
>     m_fullCounter.decrement(); // Blocks execution until there is an item to
> be processed.
>     {
>         LockHolder lockHolder(m_mutex);
>         // Remove the first of the queue.
>         value = m_queue.first();
>         m_queue.removeFirst();
>     }
>     m_emptyCounter.increment(); // Notify the other threads that an item was
> removed from the queue.

Or you could just do this:

template<typename T>
class Queue {
public:
    void enqueue(T value)
    {
        LockHolder holder(m_lock);
        while (m_data.size() >= limit)
            m_cond.wait(m_lock);
        m_data.append(value);
        m_cond.notifyAll();
    }

    T dequeue()
    {
        LockHolder locker(m_lock);
        while (m_data.isEmpty())
            m_cond.wait(m_lock);
        T result = m_data.takeFirst();
        m_cond.notifyAll();
        return result;
    }
private:
    Deque<T> m_data;
    Lock m_lock;
    Condition m_condition;
};

This has one lock, one condition variable, and will probably perform as well as your queue.
Comment 20 Filip Pizlo 2016-10-03 12:49:38 PDT
Comment on attachment 290413 [details]
Patch

I'm going to mark this r+ because I believe that the code is correct and it does everything that the changelog says it does.  I suspect that this could be simplified, but I don't want my suspicion to block your work.
Comment 21 Geoffrey Garen 2016-10-03 13:07:01 PDT
Comment on attachment 290413 [details]
Patch

Here's a design for enqueue / dequeue that I think would work, with less complexity:

(1) enqueue:

LockHolder lockHolder(m_mutex);
m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size() < BufferSize; };
if (m_isShutDown)
    return false;
m_queue.append(value);
m_condition.notifyAll();
return true;

(2) dequeue:

LockHolder lockHolder(m_mutex);
m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size(); };
if (m_isShutDown)
    return false;
value = m_queue.first();
m_queue.removeFirst();
m_condition.notifyAll();
return true;
Comment 22 Geoffrey Garen 2016-10-03 13:10:47 PDT
> If we use a single lock, that will make each thread waits for the other
> thread(s) to release the lock. For example, if a producer wants to insert a
> request in the queue it has to wait for the the consumer thread to finish
> even if the queue has empty spaces. I think we should make all threads
> compete for a single lock only when accessing the queue.

This comment concerns me and demonstrates a problem in the current design that Simon ran into as well: By using semaphores, the current design implies that enqueue and dequeue can be concurrent, but in fact that isn't true, since they both acquire m_mutex.

Perhaps semaphores could be used to achieve an alternative lock-free design, but that isn't what we have here.
Comment 23 JF Bastien 2016-10-03 14:18:38 PDT
Comment on attachment 290413 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=290413&action=review

> Source/WTF/wtf/SynchronizedFixedQueue.h:107
> +    Deque<T, BufferSize> m_queue;

You probably want to separate members onto different cache lines depending on which you expect to be read / written by threads.
Comment 24 Filip Pizlo 2016-10-03 14:34:34 PDT
(In reply to comment #23)
> Comment on attachment 290413 [details]
> Patch
> 
> View in context:
> https://bugs.webkit.org/attachment.cgi?id=290413&action=review
> 
> > Source/WTF/wtf/SynchronizedFixedQueue.h:107
> > +    Deque<T, BufferSize> m_queue;
> 
> You probably want to separate members onto different cache lines depending
> on which you expect to be read / written by threads.

Does that matter if the queue requires a single lock to protect everything?
Comment 25 Filip Pizlo 2016-10-03 14:35:56 PDT
(In reply to comment #22)
> > If we use a single lock, that will make each thread waits for the other
> > thread(s) to release the lock. For example, if a producer wants to insert a
> > request in the queue it has to wait for the the consumer thread to finish
> > even if the queue has empty spaces. I think we should make all threads
> > compete for a single lock only when accessing the queue.
> 
> This comment concerns me and demonstrates a problem in the current design
> that Simon ran into as well: By using semaphores, the current design implies
> that enqueue and dequeue can be concurrent, but in fact that isn't true,
> since they both acquire m_mutex.
> 
> Perhaps semaphores could be used to achieve an alternative lock-free design,
> but that isn't what we have here.

If it's using semaphores then it's probably not lock-free. :-P

How much perf do we need here?  Note that the JSC GC's queueing, which scales to ~8 cores, does not use lock-freedom and protects itself with one lock.
Comment 26 Geoffrey Garen 2016-10-03 15:22:30 PDT
> How much perf do we need here?

I don't think this queue is going to be a bottleneck. It guards image decoding. So, the producer does a ~4ms decode and then enqueues and then the consumer does a dequeue while rendering a ~16ms frame. Meanwhile, this little queue takes like 100 microseconds max to lock and signal.
Comment 27 Said Abou-Hallawa 2016-10-04 13:27:08 PDT
Created attachment 290638 [details]
Patch
Comment 28 Said Abou-Hallawa 2016-10-04 13:31:01 PDT
Comment on attachment 290413 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=290413&action=review

>> Source/WTF/ChangeLog:17
>> +           are controlled by two WatchableCounter: m_empty and m_full.
> 
> As a high-level comment, I think that SafeFixedQueue would look a lot cleaner if it did not use the Watchable/WatchableCounter abstraction.  I would have written it directly using a condition variable for empty and a condition variable for full.  I think that would have resulted in less code, and so fewer things that could go wrong.
> 
> On the other hand, I grok this code, so my objection is soft.

Watchable and WatchableCounter were removed. The design of SynchronizedFixedQueue is now simpler.It uses a single lock and a single condition to synchronize all the members of the class.
Comment 29 Said Abou-Hallawa 2016-10-04 13:47:09 PDT
(In reply to comment #19)
> (In reply to comment #18)
> > (In reply to comment #14)
> > > Comment on attachment 289700 [details]
> > > Patch
> > > 
> > > Can SynchronizedFixedQueue just use a single lock and condition variable
> > > instead of three?
> > > 
> > > I believe that enqueue can lock, check for shutdown, enqueue, and then
> > > broadcast, and dequeue can lock, wait for !isEmpty() && !m_isShutdown, and
> > > then dequeue. That's one lock and condition variable instead of three, and
> > > two fewer state variables (m_empty and m_full).
> > > 
> > > Why does SynchronizedFixedQueue::dequeue invoke its functor while holding
> > > the queue lock? That means I can't write an algorithm that optionally
> > > dequeues a second item because I will deadlock.
> > 
> > If we use a single lock, that will make each thread waits for the other
> > thread(s) to release the lock. For example, if a producer wants to insert a
> > request in the queue it has to wait for the the consumer thread to finish
> > even if the queue has empty spaces. I think we should make all threads
> > compete for a single lock only when accessing the queue.
> 
> I don't think you're getting any extra concurrency over what you'd get with
> just one lock.  The queue lock is the bottleneck either way.
> 
> > 
> > But to allow two threads to proceed at the same time without getting
> > incorrect value for the queue current count, two additional semaphores have
> > to be used, one for the fullCounter and the other for the emptyCounter. One
> > semaphore is used to block the thread until it can proceed and the other
> > semaphore is used to notify the other thread with completion.
> > 
> > Producer:
> >     m_emptyCounter.decrement(); // Blocks execution until there is an empty
> > space.
> >     {
> >         LockHolder lockHolder(m_mutex);
> >         m_queue.append(value);
> >     }
> >     m_fullCounter.increment(); // Notify the other thread that there is an
> > item for processing.
> > 
> > 
> > Consumer:
> >     m_fullCounter.decrement(); // Blocks execution until there is an item to
> > be processed.
> >     {
> >         LockHolder lockHolder(m_mutex);
> >         // Remove the first of the queue.
> >         value = m_queue.first();
> >         m_queue.removeFirst();
> >     }
> >     m_emptyCounter.increment(); // Notify the other threads that an item was
> > removed from the queue.
> 
> Or you could just do this:
> 
> template<typename T>
> class Queue {
> public:
>     void enqueue(T value)
>     {
>         LockHolder holder(m_lock);
>         while (m_data.size() >= limit)
>             m_cond.wait(m_lock);
>         m_data.append(value);
>         m_cond.notifyAll();
>     }
> 
>     T dequeue()
>     {
>         LockHolder locker(m_lock);
>         while (m_data.isEmpty())
>             m_cond.wait(m_lock);
>         T result = m_data.takeFirst();
>         m_cond.notifyAll();
>         return result;
>     }
> private:
>     Deque<T> m_data;
>     Lock m_lock;
>     Condition m_condition;
> };
> 
> This has one lock, one condition variable, and will probably perform as well
> as your queue.

I changed the class to be almost as you suggested expect for one thing: the close() function. I need this function because the image decoding thread has to be alive all the time as long as the image has many frames and it is animating. But I also need to terminate the thread when the Image object itself is deleted. To do that dequeue() has to return a bool saying if it returns because a successful dequeuing or because of closing the queue. And to have a robust close() I needed to track the active working threads to be sure that by the end of close() all the threads are terminated. So I added registerWorker(), unregisterWorker() and isRegisteredWorker(). I keep the worker in a HashSet. The close() function returns only when the size of the HashSet is zero.

The question I have is: I use the same condition with different predicates for closing the queue and terminating the workers as well as for enqueuing/dequeuing the elements. 
This seems to work. But should I create different conditions for different predicates?
Comment 30 Said Abou-Hallawa 2016-10-04 13:49:52 PDT
(In reply to comment #21)
> Comment on attachment 290413 [details]
> Patch
> 
> Here's a design for enqueue / dequeue that I think would work, with less
> complexity:
> 
> (1) enqueue:
> 
> LockHolder lockHolder(m_mutex);
> m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size() <
> BufferSize; };
> if (m_isShutDown)
>     return false;
> m_queue.append(value);
> m_condition.notifyAll();
> return true;
> 
> (2) dequeue:
> 
> LockHolder lockHolder(m_mutex);
> m_condition.wait(m_mutex, [] { return m_isShutDown || m_queue.size(); };
> if (m_isShutDown)
>     return false;
> value = m_queue.first();
> m_queue.removeFirst();
> m_condition.notifyAll();
> return true;

You are right. This design is simpler. I implemented it in a edition to few other functions to robustly handle the close() function.
Comment 31 Said Abou-Hallawa 2016-10-05 10:18:08 PDT
Created attachment 290722 [details]
Patch
Comment 32 Geoffrey Garen 2016-10-06 16:16:29 PDT
Comment on attachment 290722 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=290722&action=review

> Source/WTF/wtf/SynchronizedFixedQueue.h:153
> +    HashSet<const void*> m_workers;

This is not a very good separation of concerns. Tracking a set of worker threads is unrelated to maintaining data elements in a queue.

I recommend putting this set of workers into its own data structure. For example, if you're going to use WTF::WorkQueue, that data structure can track outstanding workers and provide API for waiting on them.

Another option is to use a separate data structure entirely.
Comment 33 Said Abou-Hallawa 2016-10-07 10:51:08 PDT
Created attachment 290944 [details]
Patch
Comment 34 Said Abou-Hallawa 2016-10-07 10:58:18 PDT
Comment on attachment 290722 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=290722&action=review

>> Source/WTF/wtf/SynchronizedFixedQueue.h:153
>> +    HashSet<const void*> m_workers;
> 
> This is not a very good separation of concerns. Tracking a set of worker threads is unrelated to maintaining data elements in a queue.
> 
> I recommend putting this set of workers into its own data structure. For example, if you're going to use WTF::WorkQueue, that data structure can track outstanding workers and provide API for waiting on them.
> 
> Another option is to use a separate data structure entirely.

Tracking the workers was removed form the SynchronizedFixedQueue. Instead I am using a BinarySemaphore in caller of SynchronizedFixedQueue::close(). Here is how it is working:

void Caller::start()
{
    workQueue()->dispatch([this] {
        while (m_synchronizedFixedQueue.dequeue(item))
            processItem(item);
        m_closeSemaphore.signal();
    }
}

void Caller::stop()
{
    m_synchronizedFixedQueue.close();
    m_closeSemaphore.wait(std::numeric_limits<double>::max());
}
Comment 35 Geoffrey Garen 2016-10-11 10:56:59 PDT
Comment on attachment 290944 [details]
Patch

View in context: https://bugs.webkit.org/attachment.cgi?id=290944&action=review

r=me

> Source/WTF/wtf/SynchronizedFixedQueue.h:47
> +    void open()
> +    {
> +        if (isOpen())
> +            return;

We should probably lock here. I think the branch on m_open is a race.
Comment 36 Said Abou-Hallawa 2016-10-11 11:40:00 PDT
Created attachment 291279 [details]
Patch
Comment 37 WebKit Commit Bot 2016-10-11 12:38:12 PDT
Comment on attachment 291279 [details]
Patch

Clearing flags on attachment: 291279

Committed r207156: <http://trac.webkit.org/changeset/207156>
Comment 38 WebKit Commit Bot 2016-10-11 12:38:20 PDT
All reviewed patches have been landed.  Closing bug.