Skip to content

[WIP] add cqe unpack#985

Open
yumingyue624 wants to merge 1 commit into
ModelEngine-Group:feature_26h1from
yumingyue624:protocol_pack
Open

[WIP] add cqe unpack#985
yumingyue624 wants to merge 1 commit into
ModelEngine-Group:feature_26h1from
yumingyue624:protocol_pack

Conversation

@yumingyue624
Copy link
Copy Markdown
Contributor

@yumingyue624 yumingyue624 commented May 28, 2026

Purpose

Implement BufferManager for slot-based memory management with IndexPool,
and unify SQE packing/CQE unpacking into a single KvProtocol abstraction.

Modifications

  1. Add BufferManager with IndexPool-based slot allocation, supporting
    HOST (malloc), HOST_PINNED (aclrtMallocHost + aclrtHostRegister),
    and ASCEND_DEVICE (aclrtMalloc) memory types via AscendBuffer.
  2. Unify Sqe/Cqe into KvProtocol base class with PackSqe/UnpackCqe
    interfaces, replacing separate SqeManager and CqeManager.
  3. Add ProtocolManager to manage send_buffer and flag_buffer instances,
    with PackRequest, UnpackResponse, and GetCid interfaces.
  4. Rename link_proto to link_protocol for naming consistency.
  5. Remove old sqe.h/cpp files, replaced by kv_protocol.h/cpp.

Test

BufferManagerTest (17 cases) and KvProtocolPackTest (10 cases) all passed.

@yumingyue624 yumingyue624 changed the base branch from develop to feature_26h1 May 28, 2026 11:28
@yumingyue624 yumingyue624 requested a review from nrj868 as a code owner May 28, 2026 11:28
Comment thread ucm/transport/kv/asu/trans/src/send_buffer.cpp Outdated
Comment thread ucm/transport/kv/asu/test/case/send_buffer_test.cc Outdated
Comment thread ucm/transport/kv/asu/trans/src/send_buffer.h Outdated
static constexpr std::size_t kAlignment = 512;
static constexpr std::size_t kMaxCIDCount = 65536; // 16-bit CID range
static constexpr std::size_t kMaxROBEntries = 65536;
static constexpr std::size_t kInvalidROBIndex = SIZE_MAX;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider defining this magic number (65536) as a named constant for better readability.


// Dwords 12-15: key[15:0] - 16 bytes key, low-byte aligned
std::size_t key_len = std::min(r.key.size(), static_cast<std::size_t>(16));
if (key_len > 0) { std::memcpy(&dwords[12], r.key.data(), key_len); }
if (key_len > 0) { std::memcpy(&target[12], r.key.data(), key_len); }
return Status::OK();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential buffer overflow risk. Consider using safer alternatives like strncpy or checking buffer sizes.


// Dwords 12-15: key[15:0] - 16 bytes key, low-byte aligned
std::size_t key_len = std::min(r.key.size(), static_cast<std::size_t>(16));
if (key_len > 0) { std::memcpy(&dwords[12], r.key.data(), key_len); }
if (key_len > 0) { std::memcpy(&target[12], r.key.data(), key_len); }
return Status::OK();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential buffer overflow risk. Consider using safer alternatives like strncpy or checking buffer sizes.

{
auto& r = static_cast<const KvBatchStoreRequest&>(req);
return (kSqeDwordCount + r.batch_number * kBatchEntryDwordCount) * sizeof(std::uint32_t);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using size_t or uint64_t for size calculations to avoid integer overflow.

Comment thread ucm/transport/kv/asu/trans/src/sqe.h Outdated
std::size_t PackedSize(const SqeRequest& req) const override
{
return kSqeDwordCount * sizeof(std::uint32_t);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using size_t or uint64_t for size calculations to avoid integer overflow.

Comment thread ucm/transport/kv/asu/trans/src/sqe.h Outdated
std::size_t ResponseSize(const SqeRequest& req) const override
{
return kCqeDwordCount * sizeof(std::uint32_t);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using size_t or uint64_t for size calculations to avoid integer overflow.

Comment thread ucm/transport/kv/asu/trans/src/sqe.h Outdated
std::size_t PackedSize(const SqeRequest& req) const override
{
return kSqeDwordCount * sizeof(std::uint32_t);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using size_t or uint64_t for size calculations to avoid integer overflow.

Comment thread ucm/transport/kv/asu/trans/src/send_buffer.cpp Outdated
Comment thread ucm/transport/kv/asu/trans/src/send_buffer.cpp Outdated
Comment thread ucm/transport/kv/asu/trans/src/sqe.cpp Outdated
}

Status KvStoreSqe::Validate() const
Status KvStoreSqe::Validate(const std::uint32_t* data) const
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Suggestion: After removing the size check from Validate, consider adding a null pointer check at the beginning: if (!data) return Status::Error(StatusCode::INVALID_ARGUMENT, "data pointer is null");. This prevents crashes instead of returning proper errors when called with null data.

Comment thread ucm/transport/kv/asu/trans/src/sqe.h Outdated
class KvRetrieveRequest : public SqeRequest {
public:
std::uint32_t cid{0};
std::uint16_t cid{0};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Suggestion: KvRetrieveRequest declares std::uint16_t cid{0}; but it inherits from SqeRequest which already has this field at line 43. This duplication is unnecessary and could cause confusion. Consider removing the redundant declaration.

if (!sqe) {
return Status::Error(StatusCode::INVALID_ARGUMENT, "unknown SQE opcode");
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Warning: sge.addr is std::uint64_t (a virtual address), but it's cast directly to std::uint32_t* via reinterpret_cast. On 64-bit systems, ensure the address fits in the pointer type. The proper cast sequence should be: reinterpret_cast<std::uint32_t*>(static_cast<uintptr_t>(sge.addr))

@yumingyue624 yumingyue624 force-pushed the protocol_pack branch 3 times, most recently from 8f8e29b to a411b7d Compare June 2, 2026 06:55
@yumingyue624 yumingyue624 changed the title [Fix] Correct cid field type to uint16_t to match protocol spec [Feat] Implementation of BufferManager and KvProtocol. Jun 2, 2026
@yumingyue624 yumingyue624 force-pushed the protocol_pack branch 2 times, most recently from 2d47e4f to 338d295 Compare June 2, 2026 11:11
@yumingyue624 yumingyue624 force-pushed the protocol_pack branch 2 times, most recently from 1a5a156 to 4e95153 Compare June 2, 2026 11:34
@yumingyue624 yumingyue624 changed the title [Feat] Implementation of BufferManager and KvProtocol. [WIP] add cqe unpack Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants