RDKit
Open-source cheminformatics and machine learning.
Loading...
Searching...
No Matches
ConcurrentQueue.h
Go to the documentation of this file.
1//
2// Copyright (C) 2020 Shrey Aryan
3//
4// @@ All Rights Reserved @@
5// This file is part of the RDKit.
6// The contents are covered by the terms of the BSD license
7// which is included in the file license.txt, found at the root
8// of the RDKit source tree.
9//
10#ifdef RDK_BUILD_THREADSAFE_SSS
11#ifndef CONCURRENT_QUEUE
12#define CONCURRENT_QUEUE
13#include <condition_variable>
14#include <thread>
15#include <vector>
16
17namespace RDKit {
18template <typename E>
19class ConcurrentQueue {
20 private:
21 unsigned int d_capacity;
22 bool d_done;
23 std::vector<E> d_elements;
24 unsigned int d_head, d_tail;
25 mutable std::mutex d_lock;
26 std::condition_variable d_notEmpty, d_notFull;
27
28 private:
29 ConcurrentQueue(const ConcurrentQueue<E> &);
30 ConcurrentQueue &operator=(const ConcurrentQueue<E> &);
31
32 public:
33 ConcurrentQueue(unsigned int capacity)
34 : d_capacity(capacity), d_done(false), d_head(0), d_tail(0) {
35 std::vector<E> elements(capacity);
36 d_elements = elements;
37 }
38
39 //! tries to push an element into the queue if it is not full without
40 //! modifying the variable element, if the queue is full then pushing an
41 //! element will result in blocking
42 void push(const E &element);
43
44 //! tries to pop an element from the queue if it is not empty and not done
45 //! the boolean value indicates the whether popping is successful, if the
46 //! queue is empty and not done then popping an element will result in
47 //! blocking
48 bool pop(E &element);
49
50 //! checks whether the ConcurrentQueue is empty
51 bool isEmpty() const;
52
53 //! returns the value of the variable done
54 bool getDone() const;
55
56 //! sets the variable d_done = true
57 void setDone();
58
59 //! clears the vector
60 void clear();
61};
62
63template <typename E>
64void ConcurrentQueue<E>::push(const E &element) {
65 std::unique_lock<std::mutex> lk(d_lock);
66 //! concurrent queue is full so we wait until
67 //! it is not full
68
69 while (d_head + d_capacity == d_tail) {
70 d_notFull.wait(lk);
71 }
72 bool wasEmpty = (d_head == d_tail);
73 d_elements.at(d_tail % d_capacity) = element;
74 d_tail++;
75 //! if the concurrent queue was empty before
76 //! then it is not any more since we have "pushed" an element
77 //! thus we notify all the consumer threads
78 if (wasEmpty) {
79 d_notEmpty.notify_all();
80 }
81}
82
83template <typename E>
84bool ConcurrentQueue<E>::pop(E &element) {
85 std::unique_lock<std::mutex> lk(d_lock);
86 //! concurrent queue is empty so we wait until
87 //! it is not empty
88 while (d_head == d_tail) {
89 if (d_done) {
90 return false;
91 }
92 d_notEmpty.wait(lk);
93 }
94 bool wasFull = (d_head + d_capacity == d_tail);
95 element = d_elements.at(d_head % d_capacity);
96 d_head++;
97 //! if the concurrent queue was full before
98 //! then it is not any more since we have "popped" an element
99 //! thus we notify all producer threads
100 if (wasFull) {
101 d_notFull.notify_all();
102 }
103 return true;
104}
105
106template <typename E>
107bool ConcurrentQueue<E>::isEmpty() const {
108 std::unique_lock<std::mutex> lk(d_lock);
109 return (d_head == d_tail);
110}
111
112template <typename E>
113bool ConcurrentQueue<E>::getDone() const {
114 std::unique_lock<std::mutex> lk(d_lock);
115 return d_done;
116}
117
118template <typename E>
119void ConcurrentQueue<E>::setDone() {
120 std::unique_lock<std::mutex> lk(d_lock);
121 d_done = true;
122 d_notEmpty.notify_all();
123}
124
125template <typename E>
126void ConcurrentQueue<E>::clear() {
127 std::unique_lock<std::mutex> lk(d_lock);
128 d_elements.clear();
129}
130
131} // namespace RDKit
132#endif
133#endif
Std stuff.