{-# LANGUAGE NoMonomorphismRestriction, PatternGuards, CPP #-}
module General.Conduit(
module Data.Conduit, MonadIO, liftIO,
sourceList, sinkList, sourceLStr,
mapC, mapAccumC, filterC,
mapMC, mapAccumMC,
(|$|), pipelineC, groupOnLastC,
zipFromC, linesCR
) where
import Data.Void
import Data.Conduit
import Data.Conduit.List as C
import Data.Conduit.Binary as C
import Data.Maybe
import Control.Applicative
import Control.Monad.Extra
import Control.Exception
import qualified Data.ByteString.Char8 as BS
import Control.Concurrent.Extra hiding (yield)
import Control.Monad.IO.Class
import General.Str
import Prelude
mapC :: (a -> b) -> ConduitT a b m ()
mapC = (a -> b) -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.map
mapMC :: (a -> m b) -> ConduitT a b m ()
mapMC = (a -> m b) -> ConduitT a b m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
C.mapM
mapAccumC :: (t -> t -> (t, b)) -> t -> ConduitT t b m t
mapAccumC t -> t -> (t, b)
f = (t -> t -> (t, b)) -> t -> ConduitT t b m t
forall (m :: * -> *) a s b.
Monad m =>
(a -> s -> (s, b)) -> s -> ConduitT a b m s
C.mapAccum (\t
x t
a -> t
a t -> (t, b) -> (t, b)
forall a b. a -> b -> b
`seq` t -> t -> (t, b)
f t
a t
x)
mapAccumMC :: (t -> t -> m (t, b)) -> t -> ConduitT t b m t
mapAccumMC t -> t -> m (t, b)
f = (t -> t -> m (t, b)) -> t -> ConduitT t b m t
forall (m :: * -> *) a s b.
Monad m =>
(a -> s -> m (s, b)) -> s -> ConduitT a b m s
C.mapAccumM (\t
x t
a -> t
a t -> m (t, b) -> m (t, b)
forall a b. a -> b -> b
`seq` t -> t -> m (t, b)
f t
a t
x)
filterC :: (a -> Bool) -> ConduitT a a m ()
filterC = (a -> Bool) -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
C.filter
zipFromC :: (Monad m, Enum i) => i -> ConduitM a (i, a) m ()
zipFromC :: forall (m :: * -> *) i a.
(Monad m, Enum i) =>
i -> ConduitM a (i, a) m ()
zipFromC = ConduitT a (i, a) m i -> ConduitT a (i, a) m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ConduitT a (i, a) m i -> ConduitT a (i, a) m ())
-> (i -> ConduitT a (i, a) m i) -> i -> ConduitT a (i, a) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (i -> a -> (i, (i, a))) -> i -> ConduitT a (i, a) m i
forall {m :: * -> *} {t} {t} {b}.
Monad m =>
(t -> t -> (t, b)) -> t -> ConduitT t b m t
mapAccumC (\i
i a
x -> (i -> i
forall a. Enum a => a -> a
succ i
i, (i
i,a
x)))
(|$|) :: Monad m => ConduitM i o m r1 -> ConduitM i o m r2 -> ConduitM i o m (r1,r2)
|$| :: forall (m :: * -> *) i o r1 r2.
Monad m =>
ConduitM i o m r1 -> ConduitM i o m r2 -> ConduitM i o m (r1, r2)
(|$|) ConduitM i o m r1
a ConduitM i o m r2
b = ZipConduit i o m (r1, r2) -> ConduitT i o m (r1, r2)
forall i o (m :: * -> *) r. ZipConduit i o m r -> ConduitT i o m r
getZipConduit (ZipConduit i o m (r1, r2) -> ConduitT i o m (r1, r2))
-> ZipConduit i o m (r1, r2) -> ConduitT i o m (r1, r2)
forall a b. (a -> b) -> a -> b
$ (,) (r1 -> r2 -> (r1, r2))
-> ZipConduit i o m r1 -> ZipConduit i o m (r2 -> (r1, r2))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitM i o m r1 -> ZipConduit i o m r1
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit ConduitM i o m r1
a ZipConduit i o m (r2 -> (r1, r2))
-> ZipConduit i o m r2 -> ZipConduit i o m (r1, r2)
forall a b.
ZipConduit i o m (a -> b)
-> ZipConduit i o m a -> ZipConduit i o m b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ConduitM i o m r2 -> ZipConduit i o m r2
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit ConduitM i o m r2
b
sinkList :: Monad m => ConduitM a o m [a]
sinkList :: forall (m :: * -> *) a o. Monad m => ConduitM a o m [a]
sinkList = ConduitT a o m [a]
forall (m :: * -> *) a o. Monad m => ConduitM a o m [a]
consume
groupOnLastC :: (Monad m, Eq b) => (a -> b) -> ConduitM a a m ()
groupOnLastC :: forall (m :: * -> *) b a.
(Monad m, Eq b) =>
(a -> b) -> ConduitM a a m ()
groupOnLastC a -> b
op = do
x <- ConduitT a a m (Maybe a)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await
whenJust x $ \a
x -> b -> a -> ConduitT a a m ()
forall {m :: * -> *}. Monad m => b -> a -> ConduitT a a m ()
f (a -> b
op a
x) a
x
where
f :: b -> a -> ConduitT a a m ()
f b
k a
v = ConduitT a a m (Maybe a)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT a a m (Maybe a)
-> (Maybe a -> ConduitT a a m ()) -> ConduitT a a m ()
forall a b.
ConduitT a a m a -> (a -> ConduitT a a m b) -> ConduitT a a m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe a
x -> case Maybe a
x of
Maybe a
Nothing -> a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
v
Just a
v2 | let k2 :: b
k2 = a -> b
op a
v2 -> do
Bool -> ConduitT a a m () -> ConduitT a a m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (b
k b -> b -> Bool
forall a. Eq a => a -> a -> Bool
/= b
k2) (ConduitT a a m () -> ConduitT a a m ())
-> ConduitT a a m () -> ConduitT a a m ()
forall a b. (a -> b) -> a -> b
$ a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
v
b -> a -> ConduitT a a m ()
f b
k2 a
v2
linesCR :: Monad m => ConduitM BStr BStr m ()
linesCR :: forall (m :: * -> *). Monad m => ConduitM BStr BStr m ()
linesCR = ConduitT BStr BStr m ()
forall (m :: * -> *). Monad m => ConduitM BStr BStr m ()
C.lines ConduitT BStr BStr m ()
-> ConduitT BStr BStr m () -> ConduitT BStr BStr m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (BStr -> BStr) -> ConduitT BStr BStr m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
mapC BStr -> BStr
f
where f :: BStr -> BStr
f BStr
x | Just (BStr
x, Char
'\r') <- BStr -> Maybe (BStr, Char)
BS.unsnoc BStr
x = BStr
x
| Bool
otherwise = BStr
x
sourceLStr :: Monad m => LBStr -> ConduitM i BStr m ()
sourceLStr :: forall (m :: * -> *) i. Monad m => LBStr -> ConduitM i BStr m ()
sourceLStr = [BStr] -> ConduitT i BStr m ()
forall (m :: * -> *) a i. Monad m => [a] -> ConduitT i a m ()
sourceList ([BStr] -> ConduitT i BStr m ())
-> (LBStr -> [BStr]) -> LBStr -> ConduitT i BStr m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LBStr -> [BStr]
lbstrToChunks
pipelineC :: Int -> ConduitM o Void IO r -> ConduitM o Void IO r
pipelineC :: forall o r. Int -> ConduitM o Void IO r -> ConduitM o Void IO r
pipelineC Int
buffer ConduitM o Void IO r
sink = do
sem <- IO QSem -> ConduitT o Void IO QSem
forall a. IO a -> ConduitT o Void IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QSem -> ConduitT o Void IO QSem)
-> IO QSem -> ConduitT o Void IO QSem
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem Int
buffer
chan <- liftIO newChan
bar <- liftIO newBarrier
me <- liftIO myThreadId
liftIO $ flip forkFinally (either (throwTo me) (signalBarrier bar)) $ do
runConduit $
(whileM $ do
x <- liftIO $ readChan chan
liftIO $ signalQSem sem
whenJust x yield
pure $ isJust x) .|
sink
awaitForever $ \o
x -> IO () -> ConduitT o Void IO ()
forall a. IO a -> ConduitT o Void IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT o Void IO ()) -> IO () -> ConduitT o Void IO ()
forall a b. (a -> b) -> a -> b
$ do
QSem -> IO ()
waitQSem QSem
sem
Chan (Maybe o) -> Maybe o -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe o)
chan (Maybe o -> IO ()) -> Maybe o -> IO ()
forall a b. (a -> b) -> a -> b
$ o -> Maybe o
forall a. a -> Maybe a
Just o
x
liftIO $ writeChan chan Nothing
liftIO $ waitBarrier bar