您的位置:首頁 > 軟件教程 > 教程 > iceoryx源碼閱讀(四)——共享內存通信(二)

iceoryx源碼閱讀(四)——共享內存通信(二)

來源:好特整理 | 時間:2024-05-29 08:55:57 | 閱讀:60 |  標簽: C 閱讀 通信 CEO   | 分享到:

目錄0 導引1 隊列數據結構2 共享內存獲取2.1 PublisherImpl::loan2.2 PublisherImpl::loanSample2.3 PublisherPortUser::tryAllocateChunk2.4 ChunkSender::tryAllocate3 消息發(fā)送邏輯3

0 導引

  • iceoryx源碼閱讀(一)——全局概覽

  • iceoryx源碼閱讀(二)——共享內存管理

  • iceoryx源碼閱讀(三)——共享內存管理(一)

  • iceoryx源碼閱讀(四)——共享內存通信(二)

  • iceoryx源碼閱讀(五)——共享內存通信(三)

  • iceoryx源碼閱讀(六)——共享內存創(chuàng)建

  • iceoryx源碼閱讀(七)——服務發(fā)現機制

  • iceoryx源碼閱讀(八)——IPC通信機制

本文閱讀與共享內存通信相關的邏輯。發(fā)布者首先獲取一塊共享內存,往其中寫入數據,然后向消息隊列中推入消息描述數據,訂閱者從消息隊列中讀取消息描述數據。本文從四方面進行解讀:隊列數據結構、共享內存獲取、消息發(fā)送邏輯、消息接收邏輯。

1 隊列數據結構

根據前文知道,隊列元素為 ShmSafeUnmanagedChunk ,其中存放的是 ChunkManagement 所在共享內存段的id和相對該共享內存首地址的偏移,具體如下所示:

iceoryx源碼閱讀(四)——共享內存通信(二)

消息隊列由如下代碼定義:

struct ChunkQueueData : public LockingPolicy
{
    // ...
    static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
    cxx::VariantQueue m_queue;
    // ...
};

struct ChunkDistributorData : public LockingPolicy
{
    // ...
    using QueueContainer_t =
    cxx::vector, ChunkDistributorDataProperties_t::MAX_QUEUES>;
    QueueContainer_t m_queues;
    // ...
};

struct ChunkReceiverData : public ChunkQueueDataType
{
    // ...
};
  • ChunkDistributorData 是發(fā)布者所持有的隊列數據結構,由于一個發(fā)布者會分發(fā)至多個訂閱端,所以持有多個隊列。

  • ChunkReceiverData 是訂閱者的組件,它繼承自 ChunkQueueData ,內部只有一個隊列,隊列元素類型為 ShmSafeUnmanagedChunk 。

上述代碼中,隊列數據結構的類型為 cxx::VariantQueue 。從類名看,是一個變長數組,但實際上這是一個定長數組,以下是相關數據結構定義:

enum class VariantQueueTypes : uint64_t
{
    FiFo_SingleProducerSingleConsumer = 0,
    SoFi_SingleProducerSingleConsumer = 1,
    FiFo_MultiProducerSingleConsumer = 2,
    SoFi_MultiProducerSingleConsumer = 3
};

template 
class VariantQueue
{
public:
    using fifo_t = variant,
                           concurrent::SoFi,
                           concurrent::ResizeableLockFreeQueue,
                           concurrent::ResizeableLockFreeQueue>;
    // ...

private:
    VariantQueueTypes m_type;
    fifo_t m_fifo;
};

fifo_t 是隊列底層結構類型,可能是 concurrent::FiFo 、 concurrent::SoFi 、 concurrent::ResizeableLockFreeQueue 之一,至于使用哪一種,由枚舉值 m_type 確定。這三個內部會依賴以下數據結構:

template 
struct NonZeroedBuffer
{
    struct alignas(ElementType) element_t
    {
        cxx::byte_t data[sizeof(ElementType)];
    };
    element_t value[Capacity];
};

上面這一結構本質就是一個數組,其元素類型類型為Element。

2 共享內存獲取

發(fā)送數據前,應用程序首先需要先獲取一塊合適大小的Chunk,往其中寫入數據,然后調用消息發(fā)送接口進行發(fā)送。

2.1 PublisherImpl::loan

職責:

獲取一塊共享內存,并調用構造函數進行初始化。

入參:

args:模板變參,用于調用待傳類型的構造函數,也可以不傳。

返回:

Sample類型實例,本質是對用戶可操作的共享內存段的封裝。

template 
template 
inline cxx::expected, AllocationError>
PublisherImpl::loan(Args&&... args) noexcept
{
    return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward(args)...); }));
}

整體代碼分析:

首先調用loanSample方法獲取共享內存,然后調用構造函數進行初始化,這里使用Placement new語法。需要指出的是,loanSample返回的是將用于存放用戶數據的首地址,而不是Chunk的首地址。

2.2 PublisherImpl::loanSample

職責:

分配共享內存,并將其轉換為Sample類型,并返回。

返回:

Sample類型實例。

template 
inline cxx::expected, AllocationError> PublisherImpl::loanSample() noexcept
{
    static constexpr uint32_t USER_HEADER_SIZE{std::is_same::value ? 0U : sizeof(H)};

    auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
    if (result.has_error())
    {
        return cxx::error(result.get_error());
    }
    else
    {
        return cxx::success>(convertChunkHeaderToSample(result.value()));
    }
}

整體代碼分析:

首先調用 tryAllocateChunk 獲得一塊共享內存,并構造Sample實例。

2.3 PublisherPortUser::tryAllocateChunk

職責:

分配共享內存,并將其轉換為Sample類型,并返回。

入參:

4個用于計算所需共享內存大小的參數,這里不展開介紹了。

返回值:

共享內存首地址(類型為 ChunkHeader * ,見 4.1 Chunk管理結構 )

cxx::expected
PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
                                    const uint32_t userPayloadAlignment,
                                    const uint32_t userHeaderSize,
                                    const uint32_t userHeaderAlignment) noexcept
{
    return m_chunkSender.tryAllocate(
        getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
}

整體代碼分析:

上述函數只是簡單地調用 ChunkSender tryAllocate 方法。

2.4 ChunkSender::tryAllocate

職責:

調用 MemoryManager的成員方法getChunk 得到共享內存塊或復用最后一次使用的共享內存塊。

入參:

同上(略)

返回值:

指向共享內存塊首地址的指針,類型為 ChunkHeader

template 
inline cxx::expected
ChunkSender::tryAllocate(const UniquePortId originId,
                                              const uint32_t userPayloadSize,
                                              const uint32_t userPayloadAlignment,
                                              const uint32_t userHeaderSize,
                                              const uint32_t userHeaderAlignment) noexcept
{
    const auto chunkSettingsResult =
        mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
    if (chunkSettingsResult.has_error())
    {
        return cxx::error(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
    }

    const auto& chunkSettings = chunkSettingsResult.value();
    const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();

    auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
    mepoo::ChunkHeader* lastChunkChunkHeader =
        lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;

    if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
    {
        /* * * * *  見代碼段2-4-1:復用最近一次分配的共享內存  * * * * */
    }
    else
    {
        /* * * * *  見代碼段2-4-2:分配一塊新的未使用的共享內存 * * * * */
    }
}

逐段代碼分析:

  • LINE 09 ~ LINE 17: 計算所需共享內存大小。

  • LINE 19 ~ LINE 30: 判斷最近一次分配的共享內存塊是否所有訂閱者都已讀取,并且大小超過所需大小,則復用最近一次分配的共享內存塊,否則新分配共享內存塊。

代碼段2-4-1:復用最近一次分配的共享內存

auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
    auto chunkSize = lastChunkChunkHeader->chunkSize();
    lastChunkChunkHeader->~ChunkHeader();
    new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
    lastChunkChunkHeader->setOriginId(originId);
    return cxx::success(lastChunkChunkHeader);
}
else
{
    return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}

整體代碼分析:

如果正在使用的共享內存塊未滿,則插入,并析構之前的數據,同時在這塊內存上構造新的 ChunkHeader ;否則返回錯誤。

代碼段2-4-2:分配一塊新的未使用的共享內存

auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);

if (!getChunkResult.has_error())
{
    auto& chunk = getChunkResult.value();

    // if the application allocated too much chunks, return no more chunks
    if (getMembers()->m_chunksInUse.insert(chunk))
    {
        // END of critical section
        chunk.getChunkHeader()->setOriginId(originId);
        return cxx::success(chunk.getChunkHeader());
    }
    else
    {
        // release the allocated chunk
        chunk = nullptr;
        return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
    }
}
else
{
    /// @todo iox-#1012 use cxx::error::from(E1); once available
    return cxx::error(cxx::into(getChunkResult.get_error()));
}

整體代碼分析:

調用MemoryManager的成員方法getChunk獲取共享內存塊,如果獲取成功,存入數組 m_chunksInUse 。如果獲取失敗或數組已滿,則返回獲取失敗,此時根據RAII原理, SharedChunk 的析構函數會自動將共享內存塊返還給 MemPool 。

m_chunksInUse 內部封裝的數組元素的類型為我們在 上一篇文章 中介紹的 ShmSafeUnmanagedChunk ,這個類型不具有引用計數,為什么退出作用域不會被析構?

為什么要存 m_chunksInUse 數組?原因如下:我們看到 tryAllocate 返回的是消息內存塊的指針,而消息發(fā)送的時候需要使用 SharedChunk ,我們無法將前者轉換為后者。所以,此處存入數組,消息發(fā)送函數中通過消息內存塊的指針查找對應數組元素,恢復出 SharedChunk 實例, 具體見3.3 。

3 消息發(fā)送邏輯

本質是往消息隊列推入消息描述結構 ShmSafeUnmanagedChunk 。

3.1 PublisherImpl::publish

職責:

上層應用程序調用此方法推送消息。

入參:

sample :用戶負載數據的封裝實例。

template 
inline void PublisherImpl::publish(Sample&& sample) noexcept
{
    auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
    auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
    port().sendChunk(chunkHeader);
}

整體代碼分析:

上述代碼從 sample 中取出用戶負載數據指針,據此計算 Chunk 首地址,然后調用 sendChunk 進行發(fā)送。

根據用戶負載數據指針計算 Chunk 首地址其實就是減去一個偏移量,具體計算方法如下:

ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
{
    if (userPayload == nullptr)
    {
        return nullptr;
    }
    uint64_t userPayloadAddress = reinterpret_cast(userPayload);
    auto backOffset = reinterpret_cast(userPayloadAddress - sizeof(UserPayloadOffset_t));
    return reinterpret_cast(userPayloadAddress - *backOffset);
}

其中偏移放在payload之前,即: *backOffset 。

3.2 PublisherPortUser::sendChunk

職責:

發(fā)送用戶數據。

入參:

chunkHeader ChunkHeader 類型的指針, Chunk 的首地址。

void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);

    if (offerRequested)
    {
        m_chunkSender.send(chunkHeader);
    }
    else
    {
        m_chunkSender.pushToHistory(chunkHeader);
    }
}

整體代碼分析:

3.3 ChunkSender::send

職責:

發(fā)送用戶數據。

入參:

chunkHeader ChunkHeader 指針, Chunk 的首地址。

template 
inline uint64_t ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    uint64_t numberOfReceiverTheChunkWasDelivered{0};
    mepoo::SharedChunk chunk(nullptr);
    // BEGIN of critical section, chunk will be lost if the process terminates in this section
    if (getChunkReadyForSend(chunkHeader, chunk))
    {
        numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);

        getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
        getMembers()->m_lastChunkUnmanaged = chunk;
    }
    // END of critical section

    return numberOfReceiverTheChunkWasDelivered;
}

逐段代碼分析:

  • LINE 05 ~ LINE 07: 根據 chunkHeader 指針和 m_chunksInUse 數組,恢復 SharedChunk 實例;

  • LINE 09 ~ LINE 09: 調用基類的成員方法 deliverToAllStoredQueues 向各隊列發(fā)送(推入)消息;

  • LINE 11 ~ LINE 12: 更新 m_lastChunkUnmanaged 實例,以提升性能。

3.4 ChunkDistributor::deliverToAllStoredQueues

template 
inline uint64_t ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
    uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
    typename ChunkDistributorDataType::QueueContainer_t remainingQueues;

    /* * * * *  見代碼段3-3-1:向隊列發(fā)送消息,失敗入remainingQueues  * * * * */

    /* * * * *  見代碼段3-3-2:發(fā)送失敗的不斷嘗試重新發(fā)送  * * * * */

    addToHistoryWithoutDelivery(chunk);

    return numberOfQueuesTheChunkWasDeliveredTo;
}

整體代碼分析:

這部分沒有什么內容,主要實現在代碼段3-3-1和代碼段3-3-2。

代碼段3-3-1:

{
    {
    typename MemberType_t::LockGuard_t lock(*getMembers());

    bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
    // send to all the queues
    for (auto& queue : getMembers()->m_queues)
    {
        bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);

        if (pushToQueue(queue.get(), chunk))
        {
            ++numberOfQueuesTheChunkWasDeliveredTo;
        }
        else
        {
            if (isBlockingQueue)
            {
                remainingQueues.emplace_back(queue);
            }
            else
            {
                ++numberOfQueuesTheChunkWasDeliveredTo;
                ChunkQueuePusher_t(queue.get()).lostAChunk();
            }
        }
    }
}

整體代碼分析:

這段代碼整體上是遍歷所有訂閱者隊列,調用 pushToQueue 向消息隊列推入消息,實現消息發(fā)送。但是消息隊列的長度是有限的,如果由于訂閱者處理速度太慢,隊列滿了應該怎么處理,根據設置,可以選擇兩種應對策略:

  • 將隊列保存下來(LINE 17 ~ LINE 20),后續(xù)對這些隊列不斷嘗試發(fā)送,直到所有隊列推送成功,見代碼段3-3-2;

  • 將隊列標記為 有消息丟失 (LINE 22 ~ LINE 25):

template 
inline void ChunkQueuePusher::lostAChunk() noexcept
{
    getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
}

代碼段3-3-2:不斷嘗試發(fā)送,直到所有消息發(fā)送成功

cxx::internal::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
{
    adaptiveWait.wait();
    {
        typename MemberType_t::LockGuard_t lock(*getMembers());

        /* * * * *  見代碼段3-3-3:與活躍隊列求交  * * * * */

        for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
        {
            if (pushToQueue(remainingQueues[i].get(), chunk))
            {
                remainingQueues.erase(remainingQueues.begin() + i);
                ++numberOfQueuesTheChunkWasDeliveredTo;
            }

            if (i == 0U)
            {
                break;
            }
        }
    }
}

整體代碼分析:

這部分代碼就是對剩余未發(fā)送成功的隊列進行重新發(fā)送,直到所有隊列發(fā)送成功。每輪嘗試中間會使用yield或sleep函數等待一段時間,以免不必要的性能浪費。同時,發(fā)送過程中,還會與當前活躍隊列求交,如下:

代碼段3-3-3:與活躍隊列求交

typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](memory::RelativePointer& a,
                  memory::RelativePointer& b) -> bool {
return reinterpret_cast(a.get()) > reinterpret_cast(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);

auto iter = std::set_intersection(getMembers()->m_queues.begin(),
                              getMembers()->m_queues.end(),
                              remainingQueues.begin(),
                              remainingQueues.end(),
                              queueIntersection.begin(),
                              greaterThan);
queueIntersection.resize(static_cast(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;

整體代碼分析:

上面這段代碼就是求解 remainingQueues 和當前活躍隊列 m_queues 交集,以免發(fā)生無限循環(huán)。 set_intersection 是C++標準庫函數,詳見: https://en.cppreference.com/w/cpp/algorithm/set_intersection

至此,消息發(fā)送的流程分析完畢。

4 小結

本文介紹了消息發(fā)布者獲取共享內存塊和發(fā)送邏輯,下文將介紹消息訂閱者的接收邏輯。

小編推薦閱讀

好特網發(fā)布此文僅為傳遞信息,不代表好特網認同期限觀點或證實其描述。

相關視頻攻略

更多

掃二維碼進入好特網手機版本!

掃二維碼進入好特網微信公眾號!

本站所有軟件,都由網友上傳,如有侵犯你的版權,請發(fā)郵件[email protected]

湘ICP備2022002427號-10 湘公網安備:43070202000427號© 2013~2025 haote.com 好特網