Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 75 additions & 27 deletions contracts/streaming/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![no_std]

use core::fmt::Error;

use soroban_sdk::{
contract, contractimpl, contracttype, token, Address, Env, Vec,
Address, Env, Vec, contract, contracterror, contractimpl, contracttype, token
};

// ─── Storage Keys ────────────────────────────────────────────────────────────
Expand Down Expand Up @@ -60,6 +62,19 @@ pub struct CreateStreamParams {

// ─── Errors ──────────────────────────────────────────────────────────────────

#[contracterror]
#[derive(Copy, Clone, Debug)]
pub enum StreamError {
InvalidAmount = 1,
InvalidSchedule = 2,
InvalidCliff = 3,
StreamNotFound = 4,
StreamCancelled = 5,
Unauthorized = 6,
InvalidWithdrawAmount = 7,
StreamEnded = 8,
}

// ─── Events ───────────────────────────────────────────────────────────────────

#[soroban_sdk::contractevent]
Expand Down Expand Up @@ -104,21 +119,24 @@ impl StreamingContract {
/// `total_amount` of `token` via the token's `approve()` function.
///
/// Returns the new stream's ID.
pub fn create_stream(env: Env, sender: Address, params: CreateStreamParams) -> u64 {
pub fn create_stream(env: Env, sender: Address, params: CreateStreamParams) -> Result<u64, StreamError> {
sender.require_auth();

// ── Validate params ──────────────────────────────────────────────────
if params.total_amount <= 0 {
panic!("total_amount must be > 0");
return Err(StreamError::InvalidAmount)
}
if params.end_time <= params.start_time {
panic!("end_time must be > start_time");
return Err(StreamError::InvalidSchedule)

}
if params.cliff_time < params.start_time || params.cliff_time > params.end_time {
panic!("cliff_time must be between start_time and end_time");
return Err(StreamError::InvalidCliff)

}
if params.cliff_amount < 0 || params.cliff_amount > params.total_amount {
panic!("cliff_amount must be between 0 and total_amount");
return Err(StreamError::InvalidCliff)

}

let duration = (params.end_time - params.start_time) as i128;
Expand Down Expand Up @@ -168,7 +186,7 @@ impl StreamingContract {
StreamCreatedEvent { stream_id: id, deposited_amount: stream.deposited_amount }
.publish(&env);

id
Ok(id)
}

/// Top up an existing stream with additional funds.
Expand All @@ -178,21 +196,28 @@ impl StreamingContract {
///
/// The caller must have approved this contract to spend `additional_amount`
/// of the stream's token before calling.
pub fn top_up(env: Env, stream_id: u64, additional_amount: i128) {
let mut stream = Self::load_stream(&env, stream_id);
pub fn top_up(env: Env, stream_id: u64, additional_amount: i128) -> Result<(), StreamError> {
let mut stream = match Self::load_stream(&env, stream_id) {
Ok(stream) => stream,
Err(e) => return Err(e)
};

stream.sender.require_auth();

if stream.cancelled {
panic!("cannot top up a cancelled stream");
return Err(StreamError::StreamCancelled)

}

let now = env.ledger().timestamp();
if now >= stream.end_time {
panic!("cannot top up an ended stream");
return Err(StreamError::StreamEnded)

}

if additional_amount <= 0 {
panic!("additional_amount must be > 0");
return Err(StreamError::InvalidAmount)

}

// ── Send funds ───────────────────────────────────────────────────────
Expand Down Expand Up @@ -253,6 +278,8 @@ impl StreamingContract {
new_amount_per_second,
}
.publish(&env);

Ok(())
}

// ── Write: Withdraw ──────────────────────────────────────────────────────
Expand All @@ -261,20 +288,24 @@ impl StreamingContract {
///
/// Only the recipient can call this. Pass the exact amount to withdraw
/// (must be ≤ withdrawable amount). Use `get_withdrawable` to query first.
pub fn withdraw(env: Env, stream_id: u64, amount: i128) {
let mut stream = Self::load_stream(&env, stream_id);
pub fn withdraw(env: Env, stream_id: u64, amount: i128) -> Result<(), StreamError> {
// let mut stream = Self::load_stream(&env, stream_id);
let mut stream = match Self::load_stream(&env, stream_id) {
Ok(stream) => stream,
Err(e) => return Err(e)
};

stream.recipient.require_auth();

if stream.cancelled {
panic!("stream is cancelled");
return Err(StreamError::StreamCancelled)
}

let now = env.ledger().timestamp();
let withdrawable = Self::withdrawable_amount(&stream, now);

if amount <= 0 || amount > withdrawable {
panic!("invalid withdraw amount");
return Err(StreamError::InvalidAmount)
}

stream.withdrawn_amount += amount;
Expand All @@ -293,6 +324,8 @@ impl StreamingContract {
);

WithdrawEvent { stream_id, amount }.publish(&env);

Ok(())
}

// ── Write: Cancel ────────────────────────────────────────────────────────
Expand All @@ -301,13 +334,17 @@ impl StreamingContract {
///
/// Unlocked funds (as of now) go to the recipient.
/// Remaining locked funds are returned to the sender.
pub fn cancel(env: Env, stream_id: u64) {
let mut stream = Self::load_stream(&env, stream_id);
pub fn cancel(env: Env, stream_id: u64) -> Result<(), StreamError> {
// let mut stream = Self::load_stream(&env, stream_id);
let mut stream = match Self::load_stream(&env, stream_id) {
Ok(stream) => stream,
Err(e) => return Err(e)
};

stream.sender.require_auth();

if stream.cancelled {
panic!("stream already cancelled");
return Err(StreamError::StreamCancelled)
}

let now = env.ledger().timestamp();
Expand Down Expand Up @@ -349,20 +386,31 @@ impl StreamingContract {
sender_refund: sender_gets_back,
}
.publish(&env);

Ok(())
}

// ── Read: Stream data ────────────────────────────────────────────────────

/// Get a stream by ID.
pub fn get_stream(env: Env, stream_id: u64) -> Stream {
Self::load_stream(&env, stream_id)
pub fn get_stream(env: Env, stream_id: u64) -> Result<Stream, StreamError> {
// Self::load_stream(&env, stream_id)
let stream = match Self::load_stream(&env, stream_id) {
Ok(stream) => stream,
Err(e) => return Err(e)
};
Ok(stream)
}

/// Get the withdrawable amount for a stream at current ledger time.
pub fn get_withdrawable(env: Env, stream_id: u64) -> i128 {
let stream = Self::load_stream(&env, stream_id);
pub fn get_withdrawable(env: Env, stream_id: u64) -> Result<i128, StreamError> {
// let stream = Self::load_stream(&env, stream_id);
let stream = match Self::load_stream(&env, stream_id) {
Ok(stream) => stream,
Err(e) => return Err(e)
};
let now = env.ledger().timestamp();
Self::withdrawable_amount(&stream, now)
Ok(Self::withdrawable_amount(&stream, now))
}

/// Get all stream IDs where `address` is the sender.
Expand All @@ -386,17 +434,17 @@ impl StreamingContract {
/// Extend the TTL of a stream's persistent storage without modifying data.
/// Anyone can call this to keep a long-running stream alive.
pub fn bump_stream(env: Env, stream_id: u64) {
Self::load_stream(&env, stream_id);
let _ = Self::load_stream(&env, stream_id);
Self::extend_stream_ttl(&env, stream_id);
}

// ── Internal helpers ─────────────────────────────────────────────────────

fn load_stream(env: &Env, id: u64) -> Stream {
fn load_stream(env: &Env, id: u64) -> Result<Stream, StreamError> {
env.storage()
.persistent()
.get(&DataKey::Stream(id))
.unwrap_or_else(|| panic!("stream not found"))
.unwrap_or_else(|| return Err(StreamError::StreamNotFound))
}

/// Compute total unlocked amount at `now` (UNIX seconds).
Expand Down
16 changes: 8 additions & 8 deletions contracts/streaming/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ fn test_create_stream_with_cliff() {
}

#[test]
#[should_panic(expected = "end_time must be > start_time")]
#[should_panic(expected = "Error(Contract, #2)")]
fn test_create_stream_invalid_times() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand All @@ -143,7 +143,7 @@ fn test_create_stream_invalid_times() {
}

#[test]
#[should_panic(expected = "total_amount must be > 0")]
#[should_panic(expected = "Error(Contract, #1)")]
fn test_create_stream_zero_amount() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand Down Expand Up @@ -208,7 +208,7 @@ fn test_withdraw_full_after_end() {
}

#[test]
#[should_panic(expected = "invalid withdraw amount")]
#[should_panic(expected = "Error(Contract, #1)")]
fn test_withdraw_too_much() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand Down Expand Up @@ -292,7 +292,7 @@ fn test_cancel_midway() {
}

#[test]
#[should_panic(expected = "stream already cancelled")]
#[should_panic(expected = "Error(Contract, #5)")]
fn test_cancel_twice() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand All @@ -309,7 +309,7 @@ fn test_cancel_twice() {
}

#[test]
#[should_panic(expected = "stream is cancelled")]
#[should_panic(expected = "Error(Contract, #5)")]
fn test_withdraw_from_cancelled_stream() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand Down Expand Up @@ -454,7 +454,7 @@ fn test_top_up_mid_stream_recalculates_rate() {
}

#[test]
#[should_panic(expected = "cannot top up a cancelled stream")]
#[should_panic(expected = "Error(Contract, #5)")]
fn test_top_up_cancelled_stream_panics() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand All @@ -474,7 +474,7 @@ fn test_top_up_cancelled_stream_panics() {
}

#[test]
#[should_panic(expected = "cannot top up an ended stream")]
#[should_panic(expected = "Error(Contract, #8)")]
fn test_top_up_ended_stream_panics() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand All @@ -494,7 +494,7 @@ fn test_top_up_ended_stream_panics() {
}

#[test]
#[should_panic(expected = "additional_amount must be > 0")]
#[should_panic(expected = "Error(Contract, #1)")]
fn test_top_up_zero_amount_panics() {
let t = TestEnv::setup();
let now = 1_000_000u64;
Expand Down
Loading
Loading