OpenLibrary
ts_queue.hpp
1 
2 /*
3  Thread safe wrapper for queue-like containers
4  Copyright (C) 2013 MichaƂ Walenciak <Kicer86@gmail.com>
5 
6  This program is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 
21 #ifndef OPENLIBRARY_PALGORITHM_TS_QUEUE
22 #define OPENLIBRARY_PALGORITHM_TS_QUEUE
23 
24 #include <cassert>
25 
26 #include <atomic>
27 #include <condition_variable>
28 #include <deque>
29 #include <mutex>
30 
31 #include "utils/optional.hpp"
32 
33 namespace ol
34 {
36 
51  template<typename T>
52  class TS_Queue
53  {
54  public:
57  TS_Queue(size_t max_size):
58  m_queue(),
59  m_is_not_full(),
60  m_is_not_empty(),
61  m_queue_mutex(),
62  m_max_size(max_size),
63  m_stopped(false)
64  {
65 
66  }
67 
69 
70  virtual ~TS_Queue()
71  {
72  stop();
73 
74  std::unique_lock<std::mutex> lock(m_queue_mutex);
75  assert(m_queue.empty());
76  }
77 
79 
82  void push(const T &item)
83  {
84  assert(m_stopped == false);
85 
86  if (m_stopped == false)
87  {
88  std::unique_lock<std::mutex> lock(m_queue_mutex);
89 
90  m_is_not_full.wait(lock, [&] { return m_queue.size() < m_max_size; } ); //wait for conditional_variable if there is no place in queue
91  m_queue.push_back(item);
92  m_is_not_empty.notify_one();
93  }
94  }
95 
96  [[deprecated]]
97  void push_front(const T &item)
98  {
99  push(item);
100  }
101 
103 
105  void push(T&& item)
106  {
107  assert(m_stopped == false);
108 
109  if (m_stopped == false)
110  {
111  std::unique_lock<std::mutex> lock(m_queue_mutex);
112 
113  m_is_not_full.wait(lock, [&] { return m_queue.size() < m_max_size; } ); //wait for conditional_variable if there is no place in queue
114  m_queue.push_back(std::move(item));
115  m_is_not_empty.notify_one();
116  }
117  }
118 
119  [[deprecated]]
120  void push_back(T&& item)
121  {
122  push(std::move(item));
123  }
124 
126 
132  {
133  std::unique_lock<std::mutex> lock(m_queue_mutex);
134  Optional<T> result;
135 
136  wait_for_data(lock);
137 
138  if (m_queue.empty() == false)
139  {
140  result = std::move( *(m_queue.begin()) );
141  m_queue.pop_front();
142  m_is_not_full.notify_one();
143  }
144 
145  return std::move(result);
146  }
147 
148  [[deprecated]]
149  Optional<T> pop_front()
150  {
151  return pop();
152  }
153 
155 
161  Optional<T> pop_for(const std::chrono::milliseconds& timeout)
162  {
163  std::unique_lock<std::mutex> lock(m_queue_mutex);
164  Optional<T> result;
165 
166  const bool status = wait_for_data(lock, timeout);
167 
168  if (status && m_queue.empty() == false)
169  {
170  result = std::move( *(m_queue.begin()) );
171  m_queue.pop_front();
172  m_is_not_full.notify_one();
173  }
174 
175  return std::move(result);
176  }
177 
179  size_t size() const
180  {
181  std::unique_lock<std::mutex> lock(m_queue_mutex);
182  const size_t result = m_queue.size();
183  return result;
184  }
185 
187  bool empty() const
188  {
189  std::unique_lock<std::mutex> lock(m_queue_mutex);
190  const bool result = m_queue.empty();
191  return result;
192  }
193 
195 
196  void stop()
197  {
198  m_stopped = true;
199  m_is_not_empty.notify_all();
200  }
201 
204  {
205  std::unique_lock<std::mutex> lock(m_queue_mutex);
206  Optional<T> result;
207 
208  wait_for_data(lock);
209  }
210 
211  private:
212  std::deque<T> m_queue;
213  std::condition_variable m_is_not_full;
214  std::condition_variable m_is_not_empty;
215  mutable std::mutex m_queue_mutex;
216  size_t m_max_size;
217  std::atomic<bool> m_stopped;
218 
219  void wait_for_data(std::unique_lock<std::mutex>& lock)
220  {
221  auto precond = [&]
222  {
223  const bool condition = m_stopped == false && m_queue.empty();
224  return !condition;
225  };
226 
227  m_is_not_empty.wait(lock, precond);
228  }
229 
230  bool wait_for_data(std::unique_lock<std::mutex>& lock, const std::chrono::milliseconds& timeout)
231  {
232  auto precond = [&]
233  {
234  const bool condition = m_stopped == false && m_queue.empty();
235  return !condition;
236  };
237 
238  const bool status = m_is_not_empty.wait_for(lock, timeout, precond);
239 
240  return status;
241  }
242  };
243 
244 }
245 #endif
TS_Queue(size_t max_size)
Definition: ts_queue.hpp:57
size_t size() const
Take objects count.
Definition: ts_queue.hpp:179
Definition: optional.hpp:27
void stop()
Release all threads waiting in TS_Queue::pop().
Definition: ts_queue.hpp:196
Optional< T > pop_for(const std::chrono::milliseconds &timeout)
Get data.
Definition: ts_queue.hpp:161
bool empty() const
Check if TS_Queue is empty.
Definition: ts_queue.hpp:187
Thread safe queue.
Definition: ts_queue.hpp:52
void push(T &&item)
Write data to TS_Queue.
Definition: ts_queue.hpp:105
virtual ~TS_Queue()
Destructor.
Definition: ts_queue.hpp:70
Definition: debug.hpp:45
void push(const T &item)
Write data to TS_Queue.
Definition: ts_queue.hpp:82
Optional< T > pop()
Get data.
Definition: ts_queue.hpp:131
void wait_for_data()
Wait until data is available.
Definition: ts_queue.hpp:203