Skip to content

[Feat] Asu: build and send ASU SQE requests #998

Open
yuanzhg078 wants to merge 2 commits into
ModelEngine-Group:feature_26h1from
yuanzhg078:dev_asu_trans_pr_2
Open

[Feat] Asu: build and send ASU SQE requests #998
yuanzhg078 wants to merge 2 commits into
ModelEngine-Group:feature_26h1from
yuanzhg078:dev_asu_trans_pr_2

Conversation

@yuanzhg078
Copy link
Copy Markdown
Contributor

Purpose

Implement ASU SQE request construction and submit-flow handling for creating sub-batch requests, preparing send buffers, sending through configured ASU connections, and propagating send failures.

Modifications

Add SQE request builders for BatchStore, BatchRetrieve, Delete, Exist, and KeepAlive operations.
Add transport attr validation for kv_ns_id, dtype, dspec, lr, sc, kernel_count, and quiet_count before request submission.
Allocate response flag buffers and populate response buffer address/MR key fields for SQE requests.
Pack SQE requests through ProtocolManager and store send SGE/flag buffer metadata in TransportSubBatchContext.
Add submit-flow logic to create sub-batch requests from scheduled IO/key batches.
Add send-buffer preparation and Send invocation using configured kernel_count and quiet_count attrs.
Propagate request-build and send failures to sub-batch status and per-entry status.

Test

Added SqeRequestTest and AsuSubmitFlowTest for request validation, SQE construction, sub-batch submission, send-buffer preparation, send attr handling, and failure propagation.

Introduce ASU sub-batch task state, IO scheduling limits, response status mapping, and completion finalization primitives.

Add focused tests for scheduler splitting, CQE/result status conversion, buffer release, and terminal task state handling.
Add request builders and attr validation for batch store, batch retrieve, delete, exist, and keep-alive SQEs.

Add submit-flow logic for sub-batch request creation, send-buffer preparation, configured send execution, and per-entry failure propagation.
const auto& status = sendStatuses[index];
if (status.ok()) { continue; }

SetSubBatchSendFailed(subBatchContext, status);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this implementation guarantee that the buffer slot can be reused and the channel inflight will be zeroed out after a send failure?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already add release in the end of the Completetask.


inline bool IsEntryBatchOp(TransportOpType opType)
{
return opType == TransportOpType::BATCH_LOAD || opType == TransportOpType::BATCH_STORE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that the opType passed in from the upper layer is consistent with the one defined here :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


std::uint64_t GetResponseBufferAddr(const ScatterGatherEntry& flagBuffer)
{
return flagBuffer.addr;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just call flagBuffer.addr?

return flagBuffer.addr;
}

std::uint32_t GetResponseMrKey(const ScatterGatherEntry& flagBuffer) { return flagBuffer.lkey; }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we can just call flagBuffer.lkey.


} // namespace

Status SubmitTaskRequests(const TransportTaskContext& ctx, const IoScheduler& ioScheduler,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make functions like this inside asu transport class instead of passing so many atttributes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants