#43255 [BC-Medium] user transactions might be lost due to missing error handling in celestia rpc client requests blob submit failure
#43255 [BC-Medium] User Transactions might be lost due to missing Error Handling in Celestia RPC Client Requests `blob_submit` failure
Submitted on Apr 4th 2025 at 04:03:41 UTC by @perseverance for Attackathon | Movement Labs
Report ID: #43255
Report Type: Blockchain/DLT
Report severity: Medium
Target: https://github.com/immunefi-team/attackathon-movement/tree/main/protocol-units/da/movement/
Impacts:
Causing network processing nodes to process transactions from the mempool beyond set parameters
Description
Summary
User Transactions received from users into Mempool are batched into a gRPC batch_write requests to send to DA Lightnode. The Blob Data is sent from DA Lightnode to Celestia. Since this communication is via a network communication protocol, there can be errors of different reasons. There is no mechanism in place to retry on failures. If DA Lightnode works in sequencer
mode, then the function tick_publish_blobs
just return error. This might cause user transactions to be lost and not executed, although the user transactions submission RPC requests returned success.
There would be no error to inform users.
Technical Details
Vulnerability Location
In the Scenario DA Ligtnode works in sequencer
mode, the vulnerability exists in tick_publish_blobs
function. In this function, the blocks are taken out of Receiver MPSC channel that is FIFO channel. And then call submit_blocks
to send blob data to Celestia. In case submit_blocks
return error, the function tick_publish_blobs
just return error silently, there is no mechanism to retry or store the blocks back to the database to retry later. So in this case, the block
that contains thousands of user transactions are lost.
Notice that although for DA Lightnode, the RockDB database is available, but the project has not implemented any retry mechanism or write back the blocks to database to retry later in case of failures.
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/protocol-units/da/movement/protocol/light-node/src/sequencer.rs#L219-L241
/// Ticks the block proposer to build blocks and submit them
async fn tick_publish_blobs(
&self,
receiver: &mut Receiver<Block>,
) -> Result<(), anyhow::Error> {
// get some blocks in a batch
let blocks = self.read_blocks(receiver).await?; // @audit blocks are taken out of Receiver MPSC channel that is FIFO channel.
if blocks.is_empty() {
return Ok(());
}
// submit the blobs, resizing as needed
let ids = blocks.iter().map(|b| b.id()).collect::<Vec<_>>();
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitting_block_batch");
}
self.submit_blocks(blocks).await?; // @audit call submit_blocks . If this return error then the function return error but blocks are lost
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitted_block_batch");
}
Ok(())
}
Detailed Explanation
Life cycle of a user transaction submission
Code flow of handling of a user transaction submission request to the Sequencer are described in details in report 43188.
In Sequencer, the blob contain user transactions are sent in function spawn_write_next_transaction_batch
, during block_building_parameters
duration is 1 second as default configuration , the transactions are sent to DA Lightnode in a gRPC request.
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/networks/movement/movement-full-node/src/node/tasks/transaction_ingress.rs#L97-L125
async fn spawn_write_next_transaction_batch(
&mut self,
) -> Result<ControlFlow<(), ()>, anyhow::Error> {
// SNIP
if transactions.len() > 0 {
info!(
target: "movement_timing",
batch_id = %batch_id,
transaction_count = transactions.len(),
"built_batch_write"
);
let batch_write = BatchWriteRequest { blobs: transactions };
let mut buf = Vec::new();
batch_write.encode_raw(&mut buf);
info!("batch_write size: {}", buf.len());
// spawn the actual batch write request in the background
let mut da_light_node_client = self.da_light_node_client.clone();
tokio::spawn(async move {
match da_light_node_client.batch_write(batch_write.clone()).await {
Ok(_) => {
info!(
target: "movement_timing",
batch_id = %batch_id,
"batch_write_success"
);
return;
}
Err(e) => { // @audit-issue if it return error, then the batch_write data is lost. There is no retry mechanism. The function spawn_write_next_transaction_batch still return Ok.
warn!("failed to write batch to DA: {:?} {:?}", e, batch_id);
}
}
});
}
Ok(Continue(()))
// SNIP
}
So the DA Lightnode now can work in 2 modes: blocky or sequencer mode:
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/protocol-units/da/movement/protocol/light-node/README.md#L1-L6
So in this report, I will focus on the sequencer
mode.
Sequencer mode of DA Lightnode
If the DA Lightnode works in "Sequencer" mode, then it is more complicated.
Blob Data is received from Mempool and then write to RockDB Mempool.
Then the transactions are pop from RockDB Mempool to build block. This is happening during the tick
.
In the Sequencer, the transactions in a block are limited to block_size
that can be configured. The default value is default max block size is 2048.
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/protocol-units/sequencing/memseq/sequencer/src/lib.rs#L101-L131
/// Waits for the next block to be built, either when the block size is reached or the building time expires.
async fn wait_for_next_block(&self) -> Result<Option<Block>, anyhow::Error> {
let mut transactions = Vec::with_capacity(self.block_size as usize);
let now = Instant::now();
loop {
let current_block_size = transactions.len() as u32;
if current_block_size >= self.block_size {
break;
}
let remaining = self.block_size - current_block_size;
let mut transactions_to_add = self.mempool.pop_transactions(remaining as usize).await?; // @audit pop_transactions remove transactions from RockDB mempool
transactions.append(&mut transactions_to_add);
// sleep to yield to other tasks and wait for more transactions
tokio::task::yield_now().await;
if now.elapsed().as_millis() as u64 > self.building_time_ms {
break;
}
}
if transactions.is_empty() {
Ok(None)
} else {
let new_block =
self.build_next_block(block::BlockMetadata::default(), transactions).await?;
Ok(Some(new_block))
}
}
Then the block is sent to "Sender" of Multi Producer Single Receiver Channel.
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/protocol-units/da/movement/protocol/light-node/src/sequencer.rs#L142-L162
async fn tick_build_blocks(&self, sender: Sender<Block>) -> Result<(), anyhow::Error> {
let memseq = self.memseq.clone();
// this has an internal timeout based on its building time
// so in the worst case scenario we will roughly double the internal timeout
let uid = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
debug!(target: "movement_timing", uid = %uid, "waiting_for_next_block",);
let block = memseq.wait_for_next_block().await?;
match block {
Some(block) => {
info!(target: "movement_timing", block_id = %block.id(), uid = %uid, transaction_count = block.transactions().len(), "received_block");
sender.send(block).await?; // @audit send Block built to Sender of a MPSC Channel.
Ok(())
}
None => {
// no transactions to include
debug!(target: "movement_timing", uid = %uid, "no_transactions_to_include");
Ok(())
}
}
}
In tick_publish_blobs
, blocks are taken out of Receiver MPSC channel that is FIFO channel. then call submit_blocks
to send to Celestia through default_client.blob_submit
request.
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/protocol-units/da/movement/protocol/light-node/src/sequencer.rs#L219-L241
/// Ticks the block proposer to build blocks and submit them
async fn tick_publish_blobs(
&self,
receiver: &mut Receiver<Block>,
) -> Result<(), anyhow::Error> {
// get some blocks in a batch
let blocks = self.read_blocks(receiver).await?; // @audit blocks are taken out of Receiver MPSC channel that is FIFO channel.
if blocks.is_empty() {
return Ok(());
}
// submit the blobs, resizing as needed
let ids = blocks.iter().map(|b| b.id()).collect::<Vec<_>>();
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitting_block_batch");
}
self.submit_blocks(blocks).await?; // @audit call submit_blocks . If this return error then the function return error but blocks are lost
for block_id in &ids {
info!(target: "movement_timing", %block_id, "submitted_block_batch");
}
Ok(())
}
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/protocol-units/da/movement/providers/celestia/src/da/mod.rs#L42-L52
/// Submits a CelestiaBlob to the Celestia node.
pub async fn submit_celestia_blob(&self, blob: CelestiaBlob) -> Result<u64, anyhow::Error> {
let config = TxConfig::default();
// config.with_gas(2);
let height = self.default_client.blob_submit(&[blob], config).await.map_err(|e| { // @audit call default_client.blob_submit to submit Blob
error!(error = %e, "failed to submit the blob");
anyhow::anyhow!("Failed submitting the blob: {}", e)
})?;
Ok(height)
}
So since the DA Lightnode communicates with Celestia via Celestia RPC Client so there might be different reasons sometimes the function blob_submit
might return error. For example, something at Celestia Lightnode broken and interrupt the connection, internet issue, ...
*The issue might be temparory. But there is no mechanism that the node is re-trying to send the blob_submit
. In this code, it simple drops all the transactions. So the blocks are already built from the transactions already removed from RockDB Database, so the blocks are just simply lost
In this case, I think the protocol can implement re-try mechanism can also avoid pontential lost of users transactions. Since the RockDB is available, so if there is error, there should be mechanism to store back the blocks to the RockDB database to retry later.
Blocky Mode of DA Lightnode
If the DA Lightnode works in Blocky
mode, in short, the bug is similar to bug 43188.
The code to send the blob to Celestia is here:
https://github.com/immunefi-team/attackathon-movement/blob/a2790c6ac17b7cf02a69aea172c2b38d2be8ce00/protocol-units/da/movement/protocol/light-node/src/passthrough.rs#L229-L248
/// Batch write blobs.
async fn batch_write(
&self,
request: tonic::Request<BatchWriteRequest>,
) -> std::result::Result<tonic::Response<BatchWriteResponse>, tonic::Status> {
let blobs = request.into_inner().blobs;
for data in blobs {
let blob = InnerSignedBlobV1Data::now(data.data)
.try_to_sign(&self.signer)
.await
.map_err(|e| tonic::Status::internal(format!("Failed to sign blob: {}", e)))?;
self.da
.submit_blob(blob.into()) // @audit call da.submit_blob to submit Blob to Celestia.
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?;
}
// * We are currently not returning any blobs in the response.
Ok(tonic::Response::new(BatchWriteResponse { blobs: vec![] }))
}
It is using Celestia RPC Client as in Sequencer
mode.
So if blob_submit
return an error, then the error is propagated back to the gRPC Response error message to the Sequencer.
The behavior would be similar to report 43188.
Severity Assesment
Bug Severity : Medium
Impact
High: Causing network processing nodes to process transactions from the mempool beyond set parameters
User transactions are dropped silently. There is no error message return. There is no recovery mechanism.
Likelihood: Low to Medium
The issue can happened if there is some connection issue from DA Lightnode to Celestia that cause blob_submit
requests failure. But the probability is low to Medium
Since the main purpose of DA Lightnode is for data availability and the RockDB is available, so I think missing this handling bug can have severity of Medium. But I am open to adjust the severity based on assessment of the Movement and Immunefi team.
Recommendations
In this case, I think the protocol can implement re-try mechanism can also avoid pontential lost of users transactions. Since the RockDB is available, so if there is error, protocol can implement mechanism to write back the blocks to the RockDB database to retry later.
Proof of Concept
Proof of Concept
Step 1: Create the scenarios there are 10 transactions in RockDB database. The blocks are built with 10 transactions in the MPSC Receiver channel.
Step 2: Mock the DA Lightnode client to return error when call default_client.blob_submit
.
Step 3: Call function tick_publish_blobs
Output: function tick_publish_blobs
will simply drop all transactions and return error.
Below mermaid sequence diagram and attached png would illustrate the bug for easier understanding.
sequenceDiagram
participant User
participant MovementSequencer
participant DA_Lightnode
participant CelestiaLightnode
Note over User,MovementSequencer: 1. User submits transactions
User->>MovementSequencer: Submit transactions
MovementSequencer-->>User: Return success
MovementSequencer->>DA_Lightnode: Send transactions via gRPC
DA_Lightnode->>DA_Lightnode: Store transactions in internal RockDB
Note over DA_Lightnode: 2. Build blocks from transactions
DA_Lightnode->>DA_Lightnode: Pop (Delete) transactions from RockDB,
DA_Lightnode->>DA_Lightnode: Build blocks
DA_Lightnode->>DA_Lightnode: Send blocks through internal MPSC channel
Note over DA_Lightnode,CelestiaLightnode: 3. Submit blocks to Celestia
DA_Lightnode->>CelestiaLightnode: Call blob_submit
alt
CelestiaLightnode-->>DA_Lightnode: Return error
Note over DA_Lightnode: Blocks permanently lost
Note over MovementSequencer: No error notification received
Note over User: No error notification received
end
Was this helpful?