目錄0 導引1 隊列數據結構2 共享內存獲取2.1 PublisherImpl::loan2.2 PublisherImpl::loanSample2.3 PublisherPortUser::tryAllocateChunk2.4 ChunkSender::tryAllocate3 消息發(fā)送邏輯3
iceoryx源碼閱讀(一)——全局概覽
iceoryx源碼閱讀(二)——共享內存管理
iceoryx源碼閱讀(三)——共享內存管理(一)
iceoryx源碼閱讀(四)——共享內存通信(二)
iceoryx源碼閱讀(五)——共享內存通信(三)
iceoryx源碼閱讀(六)——共享內存創(chuàng)建
iceoryx源碼閱讀(七)——服務發(fā)現機制
iceoryx源碼閱讀(八)——IPC通信機制
本文閱讀與共享內存通信相關的邏輯。發(fā)布者首先獲取一塊共享內存,往其中寫入數據,然后向消息隊列中推入消息描述數據,訂閱者從消息隊列中讀取消息描述數據。本文從四方面進行解讀:隊列數據結構、共享內存獲取、消息發(fā)送邏輯、消息接收邏輯。
根據前文知道,隊列元素為
ShmSafeUnmanagedChunk
,其中存放的是
ChunkManagement
所在共享內存段的id和相對該共享內存首地址的偏移,具體如下所示:
消息隊列由如下代碼定義:
struct ChunkQueueData : public LockingPolicy
{
// ...
static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
cxx::VariantQueue m_queue;
// ...
};
struct ChunkDistributorData : public LockingPolicy
{
// ...
using QueueContainer_t =
cxx::vector, ChunkDistributorDataProperties_t::MAX_QUEUES>;
QueueContainer_t m_queues;
// ...
};
struct ChunkReceiverData : public ChunkQueueDataType
{
// ...
};
ChunkDistributorData
是發(fā)布者所持有的隊列數據結構,由于一個發(fā)布者會分發(fā)至多個訂閱端,所以持有多個隊列。
ChunkReceiverData
是訂閱者的組件,它繼承自
ChunkQueueData
,內部只有一個隊列,隊列元素類型為
ShmSafeUnmanagedChunk
。
上述代碼中,隊列數據結構的類型為
cxx::VariantQueue
。從類名看,是一個變長數組,但實際上這是一個定長數組,以下是相關數據結構定義:
enum class VariantQueueTypes : uint64_t
{
FiFo_SingleProducerSingleConsumer = 0,
SoFi_SingleProducerSingleConsumer = 1,
FiFo_MultiProducerSingleConsumer = 2,
SoFi_MultiProducerSingleConsumer = 3
};
template
class VariantQueue
{
public:
using fifo_t = variant,
concurrent::SoFi,
concurrent::ResizeableLockFreeQueue,
concurrent::ResizeableLockFreeQueue>;
// ...
private:
VariantQueueTypes m_type;
fifo_t m_fifo;
};
fifo_t
是隊列底層結構類型,可能是
concurrent::FiFo
、
concurrent::SoFi
、
concurrent::ResizeableLockFreeQueue
之一,至于使用哪一種,由枚舉值
m_type
確定。這三個內部會依賴以下數據結構:
template
struct NonZeroedBuffer
{
struct alignas(ElementType) element_t
{
cxx::byte_t data[sizeof(ElementType)];
};
element_t value[Capacity];
};
上面這一結構本質就是一個數組,其元素類型類型為Element。
發(fā)送數據前,應用程序首先需要先獲取一塊合適大小的Chunk,往其中寫入數據,然后調用消息發(fā)送接口進行發(fā)送。
職責:
獲取一塊共享內存,并調用構造函數進行初始化。
入參:
args:模板變參,用于調用待傳類型的構造函數,也可以不傳。
返回:
Sample類型實例,本質是對用戶可操作的共享內存段的封裝。
template
template
inline cxx::expected, AllocationError>
PublisherImpl::loan(Args&&... args) noexcept
{
return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward(args)...); }));
}
整體代碼分析:
首先調用loanSample方法獲取共享內存,然后調用構造函數進行初始化,這里使用Placement new語法。需要指出的是,loanSample返回的是將用于存放用戶數據的首地址,而不是Chunk的首地址。
職責:
分配共享內存,并將其轉換為Sample類型,并返回。
返回:
Sample類型實例。
template
inline cxx::expected, AllocationError> PublisherImpl::loanSample() noexcept
{
static constexpr uint32_t USER_HEADER_SIZE{std::is_same::value ? 0U : sizeof(H)};
auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
if (result.has_error())
{
return cxx::error(result.get_error());
}
else
{
return cxx::success>(convertChunkHeaderToSample(result.value()));
}
}
整體代碼分析:
首先調用
tryAllocateChunk
獲得一塊共享內存,并構造Sample實例。
職責:
分配共享內存,并將其轉換為Sample類型,并返回。
入參:
4個用于計算所需共享內存大小的參數,這里不展開介紹了。
返回值:
共享內存首地址(類型為
ChunkHeader *
,見
4.1 Chunk管理結構
)
cxx::expected
PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
const uint32_t userPayloadAlignment,
const uint32_t userHeaderSize,
const uint32_t userHeaderAlignment) noexcept
{
return m_chunkSender.tryAllocate(
getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
}
整體代碼分析:
上述函數只是簡單地調用
ChunkSender
的
tryAllocate
方法。
職責:
調用 MemoryManager的成員方法getChunk 得到共享內存塊或復用最后一次使用的共享內存塊。
入參:
同上(略)
返回值:
指向共享內存塊首地址的指針,類型為
ChunkHeader
。
template
inline cxx::expected
ChunkSender::tryAllocate(const UniquePortId originId,
const uint32_t userPayloadSize,
const uint32_t userPayloadAlignment,
const uint32_t userHeaderSize,
const uint32_t userHeaderAlignment) noexcept
{
const auto chunkSettingsResult =
mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
if (chunkSettingsResult.has_error())
{
return cxx::error(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
}
const auto& chunkSettings = chunkSettingsResult.value();
const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();
auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
mepoo::ChunkHeader* lastChunkChunkHeader =
lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;
if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
{
/* * * * * 見代碼段2-4-1:復用最近一次分配的共享內存 * * * * */
}
else
{
/* * * * * 見代碼段2-4-2:分配一塊新的未使用的共享內存 * * * * */
}
}
逐段代碼分析:
LINE 09 ~ LINE 17: 計算所需共享內存大小。
LINE 19 ~ LINE 30: 判斷最近一次分配的共享內存塊是否所有訂閱者都已讀取,并且大小超過所需大小,則復用最近一次分配的共享內存塊,否則新分配共享內存塊。
代碼段2-4-1:復用最近一次分配的共享內存
auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
auto chunkSize = lastChunkChunkHeader->chunkSize();
lastChunkChunkHeader->~ChunkHeader();
new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
lastChunkChunkHeader->setOriginId(originId);
return cxx::success(lastChunkChunkHeader);
}
else
{
return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}
整體代碼分析:
如果正在使用的共享內存塊未滿,則插入,并析構之前的數據,同時在這塊內存上構造新的
ChunkHeader
;否則返回錯誤。
代碼段2-4-2:分配一塊新的未使用的共享內存
auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);
if (!getChunkResult.has_error())
{
auto& chunk = getChunkResult.value();
// if the application allocated too much chunks, return no more chunks
if (getMembers()->m_chunksInUse.insert(chunk))
{
// END of critical section
chunk.getChunkHeader()->setOriginId(originId);
return cxx::success(chunk.getChunkHeader());
}
else
{
// release the allocated chunk
chunk = nullptr;
return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}
}
else
{
/// @todo iox-#1012 use cxx::error::from(E1); once available
return cxx::error(cxx::into(getChunkResult.get_error()));
}
整體代碼分析:
調用MemoryManager的成員方法getChunk獲取共享內存塊,如果獲取成功,存入數組
m_chunksInUse
。如果獲取失敗或數組已滿,則返回獲取失敗,此時根據RAII原理,
SharedChunk
的析構函數會自動將共享內存塊返還給
MemPool
。
m_chunksInUse
內部封裝的數組元素的類型為我們在
上一篇文章
中介紹的
ShmSafeUnmanagedChunk
,這個類型不具有引用計數,為什么退出作用域不會被析構?
為什么要存
m_chunksInUse
數組?原因如下:我們看到
tryAllocate
返回的是消息內存塊的指針,而消息發(fā)送的時候需要使用
SharedChunk
,我們無法將前者轉換為后者。所以,此處存入數組,消息發(fā)送函數中通過消息內存塊的指針查找對應數組元素,恢復出
SharedChunk
實例,
具體見3.3
。
本質是往消息隊列推入消息描述結構
ShmSafeUnmanagedChunk
。
職責:
上層應用程序調用此方法推送消息。
入參:
sample
:用戶負載數據的封裝實例。
template
inline void PublisherImpl::publish(Sample&& sample) noexcept
{
auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
port().sendChunk(chunkHeader);
}
整體代碼分析:
上述代碼從
sample
中取出用戶負載數據指針,據此計算
Chunk
首地址,然后調用
sendChunk
進行發(fā)送。
根據用戶負載數據指針計算
Chunk
首地址其實就是減去一個偏移量,具體計算方法如下:
ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
{
if (userPayload == nullptr)
{
return nullptr;
}
uint64_t userPayloadAddress = reinterpret_cast(userPayload);
auto backOffset = reinterpret_cast(userPayloadAddress - sizeof(UserPayloadOffset_t));
return reinterpret_cast(userPayloadAddress - *backOffset);
}
其中偏移放在payload之前,即:
*backOffset
。
職責:
發(fā)送用戶數據。
入參:
chunkHeader
:
ChunkHeader
類型的指針,
Chunk
的首地址。
void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
{
const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);
if (offerRequested)
{
m_chunkSender.send(chunkHeader);
}
else
{
m_chunkSender.pushToHistory(chunkHeader);
}
}
整體代碼分析:
職責:
發(fā)送用戶數據。
入參:
chunkHeader
:
ChunkHeader
指針,
Chunk
的首地址。
template
inline uint64_t ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
uint64_t numberOfReceiverTheChunkWasDelivered{0};
mepoo::SharedChunk chunk(nullptr);
// BEGIN of critical section, chunk will be lost if the process terminates in this section
if (getChunkReadyForSend(chunkHeader, chunk))
{
numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);
getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
getMembers()->m_lastChunkUnmanaged = chunk;
}
// END of critical section
return numberOfReceiverTheChunkWasDelivered;
}
逐段代碼分析:
LINE 05 ~ LINE 07:
根據
chunkHeader
指針和
m_chunksInUse
數組,恢復
SharedChunk
實例;
LINE 09 ~ LINE 09:
調用基類的成員方法
deliverToAllStoredQueues
向各隊列發(fā)送(推入)消息;
LINE 11 ~ LINE 12:
更新
m_lastChunkUnmanaged
實例,以提升性能。
template
inline uint64_t ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
typename ChunkDistributorDataType::QueueContainer_t remainingQueues;
/* * * * * 見代碼段3-3-1:向隊列發(fā)送消息,失敗入remainingQueues * * * * */
/* * * * * 見代碼段3-3-2:發(fā)送失敗的不斷嘗試重新發(fā)送 * * * * */
addToHistoryWithoutDelivery(chunk);
return numberOfQueuesTheChunkWasDeliveredTo;
}
整體代碼分析:
這部分沒有什么內容,主要實現在代碼段3-3-1和代碼段3-3-2。
代碼段3-3-1:
{
{
typename MemberType_t::LockGuard_t lock(*getMembers());
bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
// send to all the queues
for (auto& queue : getMembers()->m_queues)
{
bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);
if (pushToQueue(queue.get(), chunk))
{
++numberOfQueuesTheChunkWasDeliveredTo;
}
else
{
if (isBlockingQueue)
{
remainingQueues.emplace_back(queue);
}
else
{
++numberOfQueuesTheChunkWasDeliveredTo;
ChunkQueuePusher_t(queue.get()).lostAChunk();
}
}
}
}
整體代碼分析:
這段代碼整體上是遍歷所有訂閱者隊列,調用
pushToQueue
向消息隊列推入消息,實現消息發(fā)送。但是消息隊列的長度是有限的,如果由于訂閱者處理速度太慢,隊列滿了應該怎么處理,根據設置,可以選擇兩種應對策略:
將隊列保存下來(LINE 17 ~ LINE 20),后續(xù)對這些隊列不斷嘗試發(fā)送,直到所有隊列推送成功,見代碼段3-3-2;
將隊列標記為 有消息丟失 (LINE 22 ~ LINE 25):
template
inline void ChunkQueuePusher::lostAChunk() noexcept
{
getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
}
代碼段3-3-2:不斷嘗試發(fā)送,直到所有消息發(fā)送成功
cxx::internal::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
{
adaptiveWait.wait();
{
typename MemberType_t::LockGuard_t lock(*getMembers());
/* * * * * 見代碼段3-3-3:與活躍隊列求交 * * * * */
for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
{
if (pushToQueue(remainingQueues[i].get(), chunk))
{
remainingQueues.erase(remainingQueues.begin() + i);
++numberOfQueuesTheChunkWasDeliveredTo;
}
if (i == 0U)
{
break;
}
}
}
}
整體代碼分析:
這部分代碼就是對剩余未發(fā)送成功的隊列進行重新發(fā)送,直到所有隊列發(fā)送成功。每輪嘗試中間會使用yield或sleep函數等待一段時間,以免不必要的性能浪費。同時,發(fā)送過程中,還會與當前活躍隊列求交,如下:
代碼段3-3-3:與活躍隊列求交
typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](memory::RelativePointer& a,
memory::RelativePointer& b) -> bool {
return reinterpret_cast(a.get()) > reinterpret_cast(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);
auto iter = std::set_intersection(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
remainingQueues.begin(),
remainingQueues.end(),
queueIntersection.begin(),
greaterThan);
queueIntersection.resize(static_cast(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;
整體代碼分析:
上面這段代碼就是求解
remainingQueues
和當前活躍隊列
m_queues
交集,以免發(fā)生無限循環(huán)。
set_intersection
是C++標準庫函數,詳見:
https://en.cppreference.com/w/cpp/algorithm/set_intersection
至此,消息發(fā)送的流程分析完畢。
本文介紹了消息發(fā)布者獲取共享內存塊和發(fā)送邏輯,下文將介紹消息訂閱者的接收邏輯。