Delete circular_queue_mp.h
This commit is contained in:
parent
e4d645cf41
commit
25b926510d
@ -1,200 +0,0 @@
|
|||||||
/*
|
|
||||||
circular_queue_mp.h - Implementation of a lock-free circular queue for EspSoftwareSerial.
|
|
||||||
Copyright (c) 2019 Dirk O. Kaar. All rights reserved.
|
|
||||||
|
|
||||||
This library is free software; you can redistribute it and/or
|
|
||||||
modify it under the terms of the GNU Lesser General Public
|
|
||||||
License as published by the Free Software Foundation; either
|
|
||||||
version 2.1 of the License, or (at your option) any later version.
|
|
||||||
|
|
||||||
This library is distributed in the hope that it will be useful,
|
|
||||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
||||||
Lesser General Public License for more details.
|
|
||||||
|
|
||||||
You should have received a copy of the GNU Lesser General Public
|
|
||||||
License along with this library; if not, write to the Free Software
|
|
||||||
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef __circular_queue_mp_h
|
|
||||||
#define __circular_queue_mp_h
|
|
||||||
|
|
||||||
#include "circular_queue.h"
|
|
||||||
|
|
||||||
#ifdef ESP8266
|
|
||||||
#include "interrupts.h"
|
|
||||||
#else
|
|
||||||
#include <mutex>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/*!
|
|
||||||
@brief Instance class for a multi-producer, single-consumer circular queue / ring buffer (FIFO).
|
|
||||||
This implementation is lock-free between producers and consumer for the available(), peek(),
|
|
||||||
pop(), and push() type functions, but is guarded to safely allow only a single producer
|
|
||||||
at any instant.
|
|
||||||
*/
|
|
||||||
template< typename T, typename ForEachArg = void >
|
|
||||||
class circular_queue_mp : protected circular_queue<T, ForEachArg>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
circular_queue_mp() = default;
|
|
||||||
circular_queue_mp(const size_t capacity) : circular_queue<T, ForEachArg>(capacity)
|
|
||||||
{}
|
|
||||||
circular_queue_mp(circular_queue<T, ForEachArg>&& cq) : circular_queue<T, ForEachArg>(std::move(cq))
|
|
||||||
{}
|
|
||||||
using circular_queue<T, ForEachArg>::operator=;
|
|
||||||
using circular_queue<T, ForEachArg>::capacity;
|
|
||||||
using circular_queue<T, ForEachArg>::flush;
|
|
||||||
using circular_queue<T, ForEachArg>::available;
|
|
||||||
using circular_queue<T, ForEachArg>::available_for_push;
|
|
||||||
using circular_queue<T, ForEachArg>::peek;
|
|
||||||
using circular_queue<T, ForEachArg>::pop;
|
|
||||||
using circular_queue<T, ForEachArg>::pop_n;
|
|
||||||
using circular_queue<T, ForEachArg>::for_each;
|
|
||||||
using circular_queue<T, ForEachArg>::for_each_rev_requeue;
|
|
||||||
|
|
||||||
/*!
|
|
||||||
@brief Resize the queue. The available elements in the queue are preserved.
|
|
||||||
This is not lock-free, but safe, concurrent producer or consumer access
|
|
||||||
is guarded.
|
|
||||||
@return True if the new capacity could accommodate the present elements in
|
|
||||||
the queue, otherwise nothing is done and false is returned.
|
|
||||||
*/
|
|
||||||
bool capacity(const size_t cap)
|
|
||||||
{
|
|
||||||
#ifdef ESP8266
|
|
||||||
esp8266::InterruptLock lock;
|
|
||||||
#else
|
|
||||||
std::lock_guard<std::mutex> lock(m_pushMtx);
|
|
||||||
#endif
|
|
||||||
return circular_queue<T, ForEachArg>::capacity(cap);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool IRAM_ATTR push() = delete;
|
|
||||||
|
|
||||||
/*!
|
|
||||||
@brief Move the rvalue parameter into the queue, guarded
|
|
||||||
for multiple concurrent producers.
|
|
||||||
@return true if the queue accepted the value, false if the queue
|
|
||||||
was full.
|
|
||||||
*/
|
|
||||||
bool IRAM_ATTR push(T&& val)
|
|
||||||
{
|
|
||||||
#ifdef ESP8266
|
|
||||||
esp8266::InterruptLock lock;
|
|
||||||
#else
|
|
||||||
std::lock_guard<std::mutex> lock(m_pushMtx);
|
|
||||||
#endif
|
|
||||||
return circular_queue<T, ForEachArg>::push(std::move(val));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*!
|
|
||||||
@brief Push a copy of the parameter into the queue, guarded
|
|
||||||
for multiple concurrent producers.
|
|
||||||
@return true if the queue accepted the value, false if the queue
|
|
||||||
was full.
|
|
||||||
*/
|
|
||||||
bool IRAM_ATTR push(const T& val)
|
|
||||||
{
|
|
||||||
#ifdef ESP8266
|
|
||||||
esp8266::InterruptLock lock;
|
|
||||||
#else
|
|
||||||
std::lock_guard<std::mutex> lock(m_pushMtx);
|
|
||||||
#endif
|
|
||||||
return circular_queue<T, ForEachArg>::push(val);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*!
|
|
||||||
@brief Push copies of multiple elements from a buffer into the queue,
|
|
||||||
in order, beginning at buffer's head. This is guarded for
|
|
||||||
multiple producers, push_n() is atomic.
|
|
||||||
@return The number of elements actually copied into the queue, counted
|
|
||||||
from the buffer head.
|
|
||||||
*/
|
|
||||||
size_t push_n(const T* buffer, size_t size)
|
|
||||||
{
|
|
||||||
#ifdef ESP8266
|
|
||||||
esp8266::InterruptLock lock;
|
|
||||||
#else
|
|
||||||
std::lock_guard<std::mutex> lock(m_pushMtx);
|
|
||||||
#endif
|
|
||||||
return circular_queue<T, ForEachArg>::push_n(buffer, size);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*!
|
|
||||||
@brief Pops the next available element from the queue, requeues
|
|
||||||
it immediately.
|
|
||||||
@return A reference to the just requeued element, or the default
|
|
||||||
value of type T if the queue is empty.
|
|
||||||
*/
|
|
||||||
T& pop_requeue();
|
|
||||||
|
|
||||||
/*!
|
|
||||||
@brief Iterate over, pop and optionally requeue each available element from the queue,
|
|
||||||
calling back fun with a reference of every single element.
|
|
||||||
Requeuing is dependent on the return boolean of the callback function. If it
|
|
||||||
returns true, the requeue occurs.
|
|
||||||
*/
|
|
||||||
bool for_each_requeue(const Delegate<bool(T&), ForEachArg>& fun);
|
|
||||||
|
|
||||||
#ifndef ESP8266
|
|
||||||
protected:
|
|
||||||
std::mutex m_pushMtx;
|
|
||||||
#endif
|
|
||||||
};
|
|
||||||
|
|
||||||
template< typename T, typename ForEachArg >
|
|
||||||
T& circular_queue_mp<T>::pop_requeue()
|
|
||||||
{
|
|
||||||
#ifdef ESP8266
|
|
||||||
esp8266::InterruptLock lock;
|
|
||||||
#else
|
|
||||||
std::lock_guard<std::mutex> lock(m_pushMtx);
|
|
||||||
#endif
|
|
||||||
const auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_acquire);
|
|
||||||
const auto inPos = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_relaxed);
|
|
||||||
std::atomic_thread_fence(std::memory_order_acquire);
|
|
||||||
if (inPos == outPos) return circular_queue<T, ForEachArg>::defaultValue;
|
|
||||||
T& val = circular_queue<T, ForEachArg>::m_buffer[inPos] = std::move(circular_queue<T, ForEachArg>::m_buffer[outPos]);
|
|
||||||
const auto bufSize = circular_queue<T, ForEachArg>::m_bufSize;
|
|
||||||
std::atomic_thread_fence(std::memory_order_release);
|
|
||||||
circular_queue<T, ForEachArg>::m_outPos.store((outPos + 1) % bufSize, std::memory_order_relaxed);
|
|
||||||
circular_queue<T, ForEachArg>::m_inPos.store((inPos + 1) % bufSize, std::memory_order_release);
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
template< typename T, typename ForEachArg >
|
|
||||||
bool circular_queue_mp<T>::for_each_requeue(const Delegate<bool(T&), ForEachArg>& fun)
|
|
||||||
{
|
|
||||||
auto inPos0 = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_acquire);
|
|
||||||
auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_relaxed);
|
|
||||||
std::atomic_thread_fence(std::memory_order_acquire);
|
|
||||||
if (outPos == inPos0) return false;
|
|
||||||
do {
|
|
||||||
T&& val = std::move(circular_queue<T, ForEachArg>::m_buffer[outPos]);
|
|
||||||
if (fun(val))
|
|
||||||
{
|
|
||||||
#ifdef ESP8266
|
|
||||||
esp8266::InterruptLock lock;
|
|
||||||
#else
|
|
||||||
std::lock_guard<std::mutex> lock(m_pushMtx);
|
|
||||||
#endif
|
|
||||||
std::atomic_thread_fence(std::memory_order_release);
|
|
||||||
auto inPos = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_relaxed);
|
|
||||||
std::atomic_thread_fence(std::memory_order_acquire);
|
|
||||||
circular_queue<T, ForEachArg>::m_buffer[inPos] = std::move(val);
|
|
||||||
std::atomic_thread_fence(std::memory_order_release);
|
|
||||||
circular_queue<T, ForEachArg>::m_inPos.store((inPos + 1) % circular_queue<T, ForEachArg>::m_bufSize, std::memory_order_release);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
std::atomic_thread_fence(std::memory_order_release);
|
|
||||||
}
|
|
||||||
outPos = (outPos + 1) % circular_queue<T, ForEachArg>::m_bufSize;
|
|
||||||
circular_queue<T, ForEachArg>::m_outPos.store(outPos, std::memory_order_release);
|
|
||||||
} while (outPos != inPos0);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif // __circular_queue_mp_h
|
|
Loading…
Reference in New Issue
Block a user