-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathFlagger.hs
More file actions
148 lines (131 loc) · 5.46 KB
/
Flagger.hs
File metadata and controls
148 lines (131 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
{-# LANGUAGE GeneralizedNewtypeDeriving, NamedFieldPuns, DeriveDataTypeable #-}
module Flagger where
import Flag
import Stats
import Log
import F
import SocketProducer
import RawSocketProducer
import Network
import Control.Monad.Reader
import Control.Concurrent.STM
import System.IO
import qualified System.Timeout as T
import Text.Regex.TDFA.String
import Text.Regex.TDFA
import System.Exit
import System.Console.CmdArgs
import Data.Maybe
_MONITOR_PORT :: PortNumber
_MONITOR_PORT = 7776
_SOCKET_PORT :: PortNumber
_SOCKET_PORT = 7777
_RAW_SOCKET_PORT :: PortNumber
_RAW_SOCKET_PORT = 7778
producers :: [FlagProducer]
producers = [ socketProducer _SOCKET_PORT
, rawSocketProducer _RAW_SOCKET_PORT]
flagger :: String -> ([Flag] -> IO (Maybe [Bool])) -> IO ()
flagger regexp submit = do
let configCmd = Config { timeout = 10 &= help "submit timeout, seconds"
, interval = 5 &= help "wait interval, seconds"
, logFilename = "flagger.log" &= help "logfile"
, wflagFile = "flagger.wflag" &= help "wrong flags file"
, rflagFile = "flagger.rflag" &= help "right flags file"
, iflagFile = "flagger.iflag" &= help "invalid flags file"
} &= summary "Flagger"
config <- cmdArgs configCmd
tvarStats <- newTVarIO $ Stats 0 0 0
tvarFlags <- newTVarIO []
runF tvarStats "monitor" (logFilename config) $
do
logI "Starting ticker..."
_ <- forkF "ticker" $ ticker submit config tvarFlags
logI "Starting listener..."
_ <- forkF "listener" $ listener regexp config tvarFlags producers
monitor _MONITOR_PORT tvarStats tvarFlags
data Config
= Config { timeout :: Int -- submit timeout, seconds
, interval :: Int -- wait interval, seconds
, logFilename :: String -- log
, wflagFile :: String -- wrong flags
, rflagFile :: String -- right flags
, iflagFile :: String -- invalid flags
}
deriving (Show, Data, Typeable)
partitionWith :: [Bool] -> [a] -> ([a], [a])
partitionWith as bs = foldl (\(l, r) (b, f) ->
if b
then (f : l, r)
else (l, f : r)) ([], []) $ zip as bs
withAccept :: MonadIO m => Socket -> ((Handle, HostName, PortNumber) -> m b) -> m b
withAccept s ma = do
x@(h, _host, _port) <- liftIO $ accept s
r <- ma x
liftIO $ hFlush h >> hClose h
return r
monitor :: PortNumber -> TVar Stats -> TVar [Flag] -> F ()
monitor p tvStats tvFlags = do
logI $ "Monitoring on port " ++ show p
s <- liftIO $ listenOn (PortNumber p)
forever $ withAccept s $ \(h, n, _) -> do
logI $ "Connection from " ++ n
liftIO $ do
stats <- atomically $ readTVar tvStats
flags <- atomically $ readTVar tvFlags
hPutStr h (show (length flags, stats) ++ "\n")
ticker :: ([Flag] -> IO (Maybe [Bool])) -> Config -> TVar [Flag] -> F ()
ticker submit Config { interval, timeout, wflagFile, rflagFile } tvar
= do
wfh <- liftIO $ openFile wflagFile AppendMode
rfh <- liftIO $ openFile rflagFile AppendMode
forever $ do
flags <- liftIO . atomically $ do
fs <- readTVar tvar
if null fs
then retry
else writeTVar tvar [] >> return fs
bvar <- liftIO $ registerDelay interval
let nflags = length flags
logI $ "Submitting " ++ show nflags ++ " flags"
mbs <- liftIO $ T.timeout timeout (submit flags)
case mbs of
Nothing -> do
logW "Submission timed out, retrying..."
Just Nothing -> return ()
Just (Just bs) -> do
if length bs /= nflags
then logE $ "Submission function incorrect, returned list with " ++
show (length bs) ++ " elements instead of " ++ show nflags
else do
let (af, naf) = partitionWith bs flags
when (naf /= []) $ do
logE $ show (length naf) ++ " flags not accepted"
liftIO . hPutStr wfh $ concatMap ((++ "\n") . show) af
when (af /= []) $ do
logS $ show (length af) ++ " flags successfully submitted"
liftIO . hPutStr rfh $ concatMap ((++ "\n") . show) naf
liftIO . atomically $ readTVar bvar >>= \b -> if b then return () else retry
listener :: String -> Config -> TVar [Flag] -> [FlagProducer] -> F ()
listener flagRegexp Config { iflagFile } tv ps = do
ifh <- liftIO $ openFile iflagFile AppendMode
-- compile regexp
reg <- case compile blankCompOpt blankExecOpt flagRegexp of
Left err -> do
logE $ "Invalid regular expression \"" ++ flagRegexp ++ "\": " ++ err
liftIO $ exitFailure
Right reg -> return reg
-- start producers
chan <- liftIO newTChanIO
mapM_ (\(FlagProducer n f) -> forkF n $ f chan) ps
-- main loop
forever $ do
SrcFlags src fs <- liftIO . atomically $ readTChan chan
let bs = map (\(Flag f) -> either (const False) isJust . execute reg $ f) fs
(cfs, ifs) = partitionWith bs fs
logI $ "Received " ++ show (length bs) ++ " flags from '" ++ src ++ "'"
when (ifs /= []) $ do
logE $ show (length ifs) ++ " flags don't match the regexp \""
++ flagRegexp ++ "\""
liftIO . hPutStr ifh $ concatMap ((++ "\n") . show) ifs
liftIO . atomically $ modifyTVar tv (cfs ++)