Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 82 additions & 32 deletions lib/Echidna/Campaign.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import Data.Set (Set)
import Data.Set qualified as Set
import Data.Text (Text, unpack)
import Data.Time (LocalTime)
import GHC.Clock (getMonotonicTimeNSec)
import Data.Vector qualified as V
import System.Random (mkStdGen)

Expand Down Expand Up @@ -97,13 +98,13 @@ runWorker
-> Int -- ^ Worker id starting from 0
-> [(FilePath, [Tx])]
-- ^ Initial corpus of transactions
-> Int -- ^ Test limit for this worker
-> WorkPool -- ^ Shared pool of remaining work
-> Maybe Text -- ^ Specified contract name
-> m (WorkerStopReason, WorkerState)
runWorker SymbolicWorker callback vm dict workerId initialCorpus _ name =
runSymWorker callback vm dict workerId initialCorpus name
runWorker FuzzWorker callback vm dict workerId initialCorpus testLimit _ =
runFuzzWorker callback vm dict workerId initialCorpus testLimit
runWorker FuzzWorker callback vm dict workerId initialCorpus workPool _ =
runFuzzWorker callback vm dict workerId initialCorpus workPool

runSymWorker
:: (MonadIO m, MonadThrow m, MonadReader Env m)
Expand Down Expand Up @@ -328,9 +329,10 @@ runFuzzWorker
-> Int -- ^ Worker id starting from 0
-> [(FilePath, [Tx])]
-- ^ Initial corpus of transactions
-> Int -- ^ Test limit for this worker
-> WorkPool -- ^ Shared pool of remaining work
-> m (WorkerStopReason, WorkerState)
runFuzzWorker callback vm dict workerId initialCorpus testLimit = do
runFuzzWorker callback vm dict workerId initialCorpus workPool = do
cfg <- asks (.cfg)
let
effectiveSeed = dict.defSeed + workerId
effectiveGenDict = dict { defSeed = effectiveSeed }
Expand All @@ -343,19 +345,45 @@ runFuzzWorker callback vm dict workerId initialCorpus testLimit = do
, totalGas = 0
, runningThreads = []
}
initialBatchSize = max 1 (cfg.campaignConf.seqLen * 10)

flip runStateT initialState $ do
flip evalRandT (mkStdGen effectiveSeed) $ do
lift callback
void $ replayCorpus vm initialCorpus
run
-- Claim initial batch from the shared pool
initialBatch <- liftIO $ claimWork workPool initialBatchSize
batchStartTime <- liftIO getMonotonicTimeNSec
batchStartCalls <- gets (.ncalls)
run initialBatch initialBatchSize batchStartTime batchStartCalls

where
run = do
targetBatchSecs = 15 :: Double

closeOptimizationTest test =
case test.testType of
OptimizationTest _ _ ->
test { Test.state = Large 0
, workerId = Just workerId
}
_ -> test

-- Close open optimization tests so they enter the shrink loop, or stop.
closeOptTestsOrStop testRefs tests remainingArgs = do
if any (\t -> isOpen t && isOptimizationTest t) tests then do
liftIO $ forM_ testRefs $ \testRef ->
atomicModifyIORef' testRef (\t -> (closeOptimizationTest t, ()))
lift callback >> uncurry4 run remainingArgs
else
lift callback >> pure TestLimitReached

uncurry4 f (a, b, c, d) = f a b c d

run remainingInBatch currentBatchSize batchStartTime batchStartCalls = do
testRefs <- asks (.testRefs)
tests <- liftIO $ traverse readIORef testRefs
CampaignConf{stopOnFail, shrinkLimit} <- asks (.cfg.campaignConf)
ncalls <- gets (.ncalls)
CampaignConf{stopOnFail, shrinkLimit, seqLen, testLimit} <- asks (.cfg.campaignConf)
ncallsBefore <- gets (.ncalls)

let
shrinkable test =
Expand All @@ -372,31 +400,57 @@ runFuzzWorker callback vm dict workerId initialCorpus testLimit = do
Failed _ -> True
_ -> False

closeOptimizationTest test =
case test.testType of
OptimizationTest _ _ ->
test { Test.state = Large 0
, workerId = Just workerId
}
_ -> test
-- Compute adaptive batch size based on throughput from previous batch
let adaptBatchSize = do
now <- liftIO getMonotonicTimeNSec
currentCalls <- gets (.ncalls)
let elapsedNs = now - batchStartTime
callsUsed = currentCalls - batchStartCalls
elapsedSecs = fromIntegral elapsedNs / 1e9 :: Double
floor_ = seqLen
ceil_ = max floor_ (testLimit `div` 10)
nextSize
| elapsedSecs > 0 =
let throughput = fromIntegral callsUsed / elapsedSecs
raw = round (throughput * targetBatchSecs) :: Int
in max floor_ (min ceil_ raw)
| otherwise =
-- elapsed=0 means very fast, double the batch
min ceil_ (currentBatchSize * 2)
newBatch <- liftIO $ claimWork workPool nextSize
pure (newBatch, nextSize)

let curBatchArgs = (0, currentBatchSize, batchStartTime, batchStartCalls)

if | stopOnFail && any final tests ->
lift callback >> pure FastFailed

-- we shrink first before going back to fuzzing
| any shrinkable tests ->
shrink >> lift callback >> run

-- no shrinking work, fuzz
| (null tests || any isOpen tests) && ncalls < testLimit ->
fuzz >> lift callback >> run

-- Test limit reached. Close any open optimization tests so they
shrink >> lift callback >> run remainingInBatch currentBatchSize batchStartTime batchStartCalls

-- no shrinking work, fuzz if we have budget
| (null tests || any isOpen tests) && remainingInBatch > 0 -> do
fuzz >> lift callback
ncallsAfter <- gets (.ncalls)
let used = ncallsAfter - ncallsBefore
remainingInBatch' = remainingInBatch - used
if remainingInBatch' <= 0 then do
-- Batch exhausted, adapt size and claim more work
(newBatch, nextSize) <- adaptBatchSize
if newBatch > 0 then do
newStartTime <- liftIO getMonotonicTimeNSec
newStartCalls <- gets (.ncalls)
run (remainingInBatch' + newBatch) nextSize newStartTime newStartCalls
else
closeOptTestsOrStop testRefs tests curBatchArgs
else
run remainingInBatch' currentBatchSize batchStartTime batchStartCalls

-- No remaining budget. Close any open optimization tests so they
-- enter the shrink loop above, same as other test types.
| ncalls >= testLimit && any (\t -> isOpen t && isOptimizationTest t) tests -> do
liftIO $ forM_ testRefs $ \testRef ->
atomicModifyIORef' testRef (\test -> (closeOptimizationTest test, ()))
lift callback >> run
| remainingInBatch <= 0 && any (\t -> isOpen t && isOptimizationTest t) tests ->
closeOptTestsOrStop testRefs tests curBatchArgs

-- no more work to do, exit
| otherwise ->
Expand All @@ -406,11 +460,7 @@ runFuzzWorker callback vm dict workerId initialCorpus testLimit = do

-- To avoid contention we only shrink tests that were falsified by this
-- worker. Tests are marked with a worker in 'updateOpenTest'.
--
-- TODO: This makes some workers run longer as they work less on their
-- test limit portion during shrinking. We should move to a test limit shared
-- between workers to avoid that. This way other workers will "drain"
-- the work queue.
-- Other workers will drain the shared work pool while this worker shrinks.
shrink = updateTests $ \test -> do
if test.workerId == Just workerId then
shrinkTest vm test
Expand Down
17 changes: 17 additions & 0 deletions lib/Echidna/Types/Campaign.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Echidna.Types.Campaign where

import Control.Concurrent (ThreadId)
import Data.IORef (IORef, newIORef, atomicModifyIORef')
import Data.Text (Text)
import Data.Word (Word8, Word16)
import GHC.Conc (numCapabilities)
Expand Down Expand Up @@ -134,3 +135,19 @@ getNFuzzWorkers conf = maybe defaultN fromIntegral conf.workers
n = numCapabilities
maxN = max 1 n
defaultN = min 4 maxN -- capped at 4 by default

-- | A shared pool of remaining work (number of calls) for all fuzz workers.
-- Workers claim batches from this pool for natural load balancing.
newtype WorkPool = WorkPool (IORef Int)

-- | Create a new work pool with the given total number of calls.
newWorkPool :: Int -> IO WorkPool
newWorkPool total = WorkPool <$> newIORef total

-- | Atomically claim up to @batchSize@ calls from the pool.
-- Returns the number of calls actually claimed (0 if pool is exhausted).
claimWork :: WorkPool -> Int -> IO Int
claimWork (WorkPool ref) batchSize =
atomicModifyIORef' ref $ \remaining ->
let claimed = min batchSize remaining
in (remaining - claimed, claimed)
26 changes: 13 additions & 13 deletions lib/Echidna/UI.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import Echidna.Output.Corpus (saveCorpusEvent)
import Echidna.Output.JSON qualified
import Echidna.Server (runSSEServer)
import Echidna.SourceAnalysis.Slither (isEmptySlitherInfo)
import Echidna.Types.Campaign
import Echidna.Types.Campaign hiding (claimWork)
import Echidna.Types.Config
import Echidna.Types.Corpus qualified as Corpus
import Echidna.Types.Coverage (coverageStats)
Expand Down Expand Up @@ -84,19 +84,17 @@ ui vm dict initialCorpus cliSelectedContract = do
Interactive | not terminalPresent -> NonInteractive Text
other -> other

-- Distribute over all workers, could be slightly bigger overall due to
-- ceiling but this doesn't matter
perWorkerTestLimit = ceiling
(fromIntegral conf.campaignConf.testLimit / fromIntegral nFuzzWorkers :: Double)

chunkSize = ceiling
(fromIntegral (length initialCorpus) / fromIntegral nFuzzWorkers :: Double)
corpusChunks = chunksOf chunkSize initialCorpus ++ repeat []

-- Shared work pool: all fuzz workers claim batches from this pool
workPool <- liftIO $ newWorkPool conf.campaignConf.testLimit

corpusSaverStopVar <- spawnListener (saveCorpusEvent env)

workers <- forM (zip corpusChunks [0..(nworkers-1)]) $
uncurry (spawnWorker env perWorkerTestLimit)
uncurry (spawnWorker env workPool)

case effectiveMode of
Interactive -> do
Expand Down Expand Up @@ -227,7 +225,7 @@ ui vm dict initialCorpus cliSelectedContract = do

where

spawnWorker env testLimit corpusChunk workerId = do
spawnWorker env workPool corpusChunk workerId = do
stateRef <- newIORef initialWorkerState

threadId <- forkIO $ do
Expand All @@ -239,7 +237,7 @@ ui vm dict initialCorpus cliSelectedContract = do
corpus = if workerType == SymbolicWorker then initialCorpus else corpusChunk
maybeResult <- timeout timeoutUsecs $
runWorker workerType (get >>= writeIORef stateRef)
vm dict workerId corpus testLimit cliSelectedContract
vm dict workerId corpus workPool cliSelectedContract
pure $ case maybeResult of
Just (stopReason, _finalState) -> stopReason
Nothing -> TimeLimitReached
Expand All @@ -250,14 +248,16 @@ ui vm dict initialCorpus cliSelectedContract = do

-- When a fuzz worker is interrupted by timeout, tests may not have
-- finished shrinking. Run a shrink-only pass outside the timeout using
-- the same worker loop (testLimit=0 means no fuzzing, only shrink).
-- the same worker loop with an exhausted pool (no fuzzing, only shrink).
-- (See github.com/crytic/echidna/issues/839)
case stopReason of
TimeLimitReached | workerType == FuzzWorker -> do
tests <- traverse readIORef env.testRefs
when (any needsShrinking tests) $ void $
runReaderT (runWorker FuzzWorker (get >>= writeIORef stateRef)
vm dict workerId [] 0 cliSelectedContract) env
when (any needsShrinking tests) $ do
emptyPool <- liftIO $ newWorkPool 0
void $
runReaderT (runWorker FuzzWorker (get >>= writeIORef stateRef)
vm dict workerId [] emptyPool cliSelectedContract) env
_ -> pure ()

time <- liftIO getTimestamp
Expand Down
3 changes: 2 additions & 1 deletion src/test/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ runContract f selectedContract cfg workerType = do

(vm, env, dict) <- prepareContract cfg (f :| []) buildOutput selectedContract seed

workPool <- newWorkPool cfg.campaignConf.testLimit
(_stopReason, finalState) <- flip runReaderT env $
runWorker workerType (pure ()) vm dict 0 [] cfg.campaignConf.testLimit selectedContract
runWorker workerType (pure ()) vm dict 0 [] workPool selectedContract

-- TODO: consider snapshotting the state so checking functions don't need to
-- be IO
Expand Down
Loading