mirror of
https://github.com/VectorCamp/vectorscan.git
synced 2025-06-28 16:41:01 +03:00
292 lines
9.1 KiB
C++
292 lines
9.1 KiB
C++
/*
|
|
* Copyright (c) 2016, Intel Corporation
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Intel Corporation nor the names of its contributors
|
|
* may be used to endorse or promote products derived from this software
|
|
* without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
#ifndef BOUNDEDQUEUE_H
|
|
#define BOUNDEDQUEUE_H
|
|
|
|
#include <algorithm>
|
|
#include <cassert>
|
|
#include <condition_variable>
|
|
#include <memory>
|
|
#include <mutex>
|
|
#include <queue>
|
|
#include <type_traits>
|
|
#include <vector>
|
|
|
|
#include <boost/core/noncopyable.hpp>
|
|
|
|
//#define QUEUE_STATS 1
|
|
|
|
#ifdef QUEUE_STATS
|
|
|
|
#include <iostream>
|
|
|
|
class BoundedQueueStats {
|
|
public:
|
|
size_t pop = 0; //!< Number of pop operations.
|
|
size_t pop_block = 0; //!< Number of pop operations that had to block.
|
|
size_t push = 0; //!< Number of push operations.
|
|
size_t push_elements = 0; //!< Number of elements pushed.
|
|
size_t push_block = 0; //!< Number of push operations that had to block.
|
|
size_t refill = 0; //!< Number of refills done.
|
|
size_t stolen_from = 0; //!< Number of times we were stolen from.
|
|
|
|
void dump() const {
|
|
std::cout << "pop : " << pop << std::endl;
|
|
std::cout << "pop_block : " << pop_block << std::endl;
|
|
std::cout << "push : " << push << std::endl;
|
|
std::cout << "push_elements : " << push_elements << std::endl;
|
|
std::cout << "push_block : " << push_block << std::endl;
|
|
std::cout << "refill : " << refill << std::endl;
|
|
std::cout << "stolen_from : " << stolen_from << std::endl;
|
|
}
|
|
};
|
|
#endif
|
|
|
|
template<typename T>
|
|
class BoundedQueue : boost::noncopyable {
|
|
private:
|
|
// Encapsulates a queue and the mutex used to protect access to it.
|
|
class MutexQueue {
|
|
public:
|
|
// Forwarded queue operations.
|
|
void push(std::unique_ptr<T> elem) { q.push(std::move(elem)); }
|
|
void pop() { q.pop(); }
|
|
std::unique_ptr<T> &front() { return q.front(); }
|
|
bool empty() const { return q.empty(); }
|
|
size_t size() const { return q.size(); }
|
|
|
|
// Acquire the mutex lock.
|
|
std::unique_lock<std::mutex> lock() {
|
|
return std::unique_lock<std::mutex>(mutex);
|
|
}
|
|
|
|
#ifdef QUEUE_STATS
|
|
BoundedQueueStats stats;
|
|
#endif
|
|
|
|
private:
|
|
std::mutex mutex;
|
|
std::queue<std::unique_ptr<T>> q;
|
|
};
|
|
|
|
public:
|
|
BoundedQueue(size_t consumers, size_t size)
|
|
: max_elements(size), consumer_q(consumers) {
|
|
assert(consumers > 0);
|
|
assert(size > 0);
|
|
}
|
|
|
|
#ifdef QUEUE_STATS
|
|
~BoundedQueue() {
|
|
std::cout << "Global queue stats:" << std::endl;
|
|
global_q.stats.dump();
|
|
std::cout << std::endl;
|
|
for (size_t i = 0; i < consumer_q.size(); i++) {
|
|
std::cout << "Consumer queue " << i << ":" << std::endl;
|
|
consumer_q[i].stats.dump();
|
|
std::cout << std::endl;
|
|
}
|
|
}
|
|
#endif // QUEUE_STATS
|
|
|
|
void push(std::unique_ptr<T> elem) {
|
|
auto lock = global_q.lock();
|
|
|
|
#ifdef QUEUE_STATS
|
|
global_q.stats.push++;
|
|
global_q.stats.push_elements++;
|
|
if (global_q.size() >= max_elements) {
|
|
global_q.stats.push_block++;
|
|
}
|
|
#endif // QUEUE_STATS
|
|
|
|
// Block until queue is able to accept new elements.
|
|
cond_can_accept.wait(lock,
|
|
[&] { return global_q.size() < max_elements; });
|
|
assert(global_q.size() < max_elements);
|
|
|
|
global_q.push(std::move(elem));
|
|
cond_can_consume.notify_all();
|
|
}
|
|
|
|
template<class Iter>
|
|
void push(Iter begin, Iter end) {
|
|
using ElemType = typename std::remove_reference<decltype(*begin)>::type;
|
|
static_assert(std::is_same<ElemType, std::unique_ptr<T>>::value,
|
|
"Iterator must be over unique_ptr<T>");
|
|
|
|
if (begin == end) {
|
|
return;
|
|
}
|
|
|
|
auto lock = global_q.lock();
|
|
|
|
#ifdef QUEUE_STATS
|
|
global_q.stats.push++;
|
|
global_q.stats.push_elements += std::distance(begin, end);
|
|
if (global_q.size() >= max_elements) {
|
|
global_q.stats.push_block++;
|
|
}
|
|
#endif // QUEUE_STATS
|
|
|
|
// Block until queue is able to accept new elements.
|
|
cond_can_accept.wait(lock,
|
|
[&] { return global_q.size() < max_elements; });
|
|
assert(global_q.size() < max_elements);
|
|
|
|
for (auto it = begin; it != end; ++it) {
|
|
global_q.push(std::move(*it));
|
|
}
|
|
cond_can_consume.notify_all();
|
|
}
|
|
|
|
std::unique_ptr<T> pop(size_t consumer_id) {
|
|
assert(consumer_id < consumer_q.size());
|
|
auto &q = consumer_q[consumer_id];
|
|
|
|
// Try and satisfy the request from our per-consumer queue.
|
|
{
|
|
auto consumer_lock = q.lock();
|
|
if (!q.empty()) {
|
|
return pop_from_queue(q);
|
|
}
|
|
}
|
|
|
|
// Try and satisfy the request with a refill from the global queue.
|
|
{
|
|
auto lock = global_q.lock();
|
|
if (!global_q.empty()) {
|
|
auto consumer_lock = q.lock();
|
|
return refill_and_pop(q);
|
|
}
|
|
}
|
|
|
|
// Try and satisfy the request by stealing it from another queue.
|
|
for (size_t i = 1; i < consumer_q.size(); i++) {
|
|
size_t victim_id = (consumer_id + i) % consumer_q.size();
|
|
auto &victim_q = consumer_q[victim_id];
|
|
auto victim_lock = victim_q.lock();
|
|
// Note: we don't steal sentinel elements.
|
|
if (!victim_q.empty() && victim_q.front() != nullptr) {
|
|
#ifdef QUEUE_STATS
|
|
victim_q.stats.stolen_from++;
|
|
#endif
|
|
return pop_from_queue(victim_q);
|
|
}
|
|
}
|
|
|
|
// All avenues exhausted, we must block until we've received a new
|
|
// element.
|
|
auto lock = global_q.lock();
|
|
#ifdef QUEUE_STATS
|
|
global_q.stats.pop_block++;
|
|
#endif
|
|
cond_can_consume.wait(lock, [&]{ return !global_q.empty(); });
|
|
assert(!global_q.empty());
|
|
auto consumer_lock = q.lock();
|
|
return refill_and_pop(q);
|
|
}
|
|
|
|
private:
|
|
std::unique_ptr<T> pop_from_queue(MutexQueue &q) {
|
|
assert(!q.empty());
|
|
auto elem = std::move(q.front());
|
|
q.pop();
|
|
#ifdef QUEUE_STATS
|
|
q.stats.pop++;
|
|
#endif
|
|
return elem;
|
|
}
|
|
|
|
std::unique_ptr<T> refill_and_pop(MutexQueue &q) {
|
|
assert(!global_q.empty());
|
|
|
|
#ifdef QUEUE_STATS
|
|
q.stats.refill++;
|
|
#endif
|
|
|
|
auto elem = pop_from_queue(global_q);
|
|
if (elem == nullptr) {
|
|
return elem; // Sentinel.
|
|
}
|
|
|
|
// Grab all subsequent elements that share the same ID.
|
|
const auto &id = elem->id;
|
|
while (!global_q.empty()) {
|
|
auto &first = global_q.front();
|
|
if (first == nullptr) {
|
|
#ifdef QUEUE_STATS
|
|
q.stats.push++;
|
|
q.stats.push_elements++;
|
|
#endif
|
|
// Sentinel element. We can grab one, but no more.
|
|
q.push(pop_from_queue(global_q));
|
|
break;
|
|
}
|
|
if (first->id != id) {
|
|
break;
|
|
}
|
|
#ifdef QUEUE_STATS
|
|
q.stats.push++;
|
|
q.stats.push_elements++;
|
|
#endif
|
|
q.push(pop_from_queue(global_q));
|
|
}
|
|
|
|
if (global_q.size() < max_elements) {
|
|
cond_can_accept.notify_all();
|
|
}
|
|
|
|
return elem;
|
|
}
|
|
|
|
// Maximum number of elements in the global queue (subsequent push
|
|
// operations will block). Note that we may overshoot this value when
|
|
// handling bulk push operations.
|
|
const size_t max_elements;
|
|
|
|
// Global queue.
|
|
MutexQueue global_q;
|
|
|
|
// Per-consumer queues.
|
|
std::vector<MutexQueue> consumer_q;
|
|
|
|
// Condition variable for producers to wait on when the queue is full.
|
|
std::condition_variable cond_can_accept;
|
|
|
|
// Condition variable for consumers to wait on when the queue is empty.
|
|
std::condition_variable cond_can_consume;
|
|
};
|
|
|
|
#ifdef QUEUE_STATS
|
|
#undef QUEUE_STATS
|
|
#endif
|
|
|
|
#endif // BOUNDEDQUEUE_H
|