Skip to content

Commit 078156b

Browse files
authored
Introduce Semaphore class & fix deadlock in PooledThreadExecutor (#97)
The problem is calling |wait| after a |notify_all| blocks the thread. The semaphore solves this problem by not blocking if we have enough "credits" that have been released.
1 parent 2fcd72b commit 078156b

File tree

5 files changed

+111
-9
lines changed

5 files changed

+111
-9
lines changed

aws-cpp-sdk-core/include/aws/core/utils/threading/Executor.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
#include <functional>
2323
#include <future>
2424
#include <mutex>
25-
#include <condition_variable>
25+
#include <aws/core/utils/threading/Semaphore.h>
2626

2727
namespace Aws
2828
{
@@ -99,8 +99,7 @@ namespace Aws
9999
private:
100100
Aws::Queue<std::function<void()>*> m_tasks;
101101
std::mutex m_queueLock;
102-
std::mutex m_syncPointLock;
103-
std::condition_variable m_syncPoint;
102+
Aws::Utils::Threading::Semaphore m_sync;
104103
Aws::Vector<ThreadTask*> m_threadTaskHandles;
105104
size_t m_poolSize;
106105
OverflowPolicy m_overflowPolicy;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
#pragma once
17+
18+
#include <aws/core/Core_EXPORTS.h>
19+
#include <mutex>
20+
#include <condition_variable>
21+
22+
namespace Aws
23+
{
24+
namespace Utils
25+
{
26+
namespace Threading
27+
{
28+
class AWS_CORE_API Semaphore {
29+
public:
30+
/**
31+
* Initializes a new instance of Semaphore class specifying the initial number of entries and
32+
* the maximum number of concurrent entries.
33+
*/
34+
Semaphore(size_t initialCount, size_t maxCount);
35+
/**
36+
* Blocks the current thread until it receives a signal.
37+
*/
38+
void WaitOne();
39+
/**
40+
* Exits the semaphore once.
41+
*/
42+
void Release();
43+
/**
44+
* Exit the semaphore up to the maximum number of entries available.
45+
*/
46+
void ReleaseAll();
47+
private:
48+
size_t m_count;
49+
const size_t m_maxCount;
50+
std::mutex m_mutex;
51+
std::condition_variable m_syncPoint;
52+
};
53+
}
54+
}
55+
}

aws-cpp-sdk-core/source/utils/threading/Executor.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx)
2929
}
3030

3131
PooledThreadExecutor::PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy) :
32-
m_poolSize(poolSize), m_overflowPolicy(overflowPolicy)
32+
m_sync(0, poolSize), m_poolSize(poolSize), m_overflowPolicy(overflowPolicy)
3333
{
3434
for (size_t index = 0; index < m_poolSize; ++index)
3535
{
@@ -41,10 +41,10 @@ PooledThreadExecutor::~PooledThreadExecutor()
4141
{
4242
for(auto threadTask : m_threadTaskHandles)
4343
{
44-
threadTask->StopProcessingWork();
44+
threadTask->StopProcessingWork();
4545
}
4646

47-
m_syncPoint.notify_all();
47+
m_sync.ReleaseAll();
4848

4949
for (auto threadTask : m_threadTaskHandles)
5050
{
@@ -80,7 +80,7 @@ bool PooledThreadExecutor::SubmitToThread(std::function<void()>&& fn)
8080
m_tasks.push(fnCpy);
8181
}
8282

83-
m_syncPoint.notify_one();
83+
m_sync.Release();
8484

8585
return true;
8686
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
#include <aws/core/utils/threading/Semaphore.h>
17+
#include <algorithm>
18+
19+
using namespace Aws::Utils::Threading;
20+
21+
Semaphore::Semaphore(size_t initialCount, size_t maxCount)
22+
: m_count(initialCount), m_maxCount(maxCount)
23+
{
24+
}
25+
26+
void Semaphore::WaitOne()
27+
{
28+
std::unique_lock<std::mutex> locker(m_mutex);
29+
if(0 == m_count)
30+
{
31+
m_syncPoint.wait(locker, [this] { return m_count > 0; });
32+
}
33+
--m_count;
34+
}
35+
36+
void Semaphore::Release()
37+
{
38+
std::lock_guard<std::mutex> locker(m_mutex);
39+
m_count = std::min(m_maxCount, m_count + 1);
40+
m_syncPoint.notify_one();
41+
}
42+
43+
void Semaphore::ReleaseAll()
44+
{
45+
std::lock_guard<std::mutex> locker(m_mutex);
46+
m_count = m_maxCount;
47+
m_syncPoint.notify_all();
48+
}
49+

aws-cpp-sdk-core/source/utils/threading/ThreadTask.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,9 @@ void ThreadTask::MainTaskRunner()
4343
}
4444
}
4545

46-
std::unique_lock<std::mutex> locker(m_executor.m_syncPointLock);
4746
if(m_continue)
4847
{
49-
m_executor.m_syncPoint.wait(locker);
48+
m_executor.m_sync.WaitOne();
5049
}
5150
}
5251
}

0 commit comments

Comments
 (0)