Nevertheless I've tidied up the code a little, particularly in terms of how streams from the radio can be associated with stream handlers.
One of the the truly nice things about Haskell is how elegantly concurrency and parallelism are integrated. Indeed as a pure, functional language Haskell lacks so many of the features that cause real problems with concurrency in other languages (e.g. destructive assignment, shared state, uncontrolled side-effects). While Haskell has many tools for concurrency, even the basic explicit concurrency (i.e. spawning a thread) is simpler and less burdened with issues. Haskell threads are lightweight 'green' threads that are scheduled onto hardware threads by the Haskell runtime. That means you can create hundreds or thousands, even tens of thousands of threads without gumming up the works. Moreover, Haskell threads behave very nicely when they are doing IO, they can be terminated without any problems and tasks like IO can be timed-out by just wrapping a 'timeout' function around some action. All of this lends itself beautifully to asynchronous handlers of the kind needed to react to data stream packets and notifications.
Philam's comms module now has a "startInStreamHandler" with the following type:
startInStreamHandler :: FlexRadioConnection -> InStreamSpec -> PacketHandler -> Maybe PortNumber ->
IO (Maybe FlexInStream)
This takes:
A radio connection, to identify which radio the stream will be from
A stream specification, to describe what kind of stream is required
A packet handler, a function to handle each packet of the stream
An optional port number, provided if a port other than the standard VRT port is to be used
If the stream could be created and the handler was successfully launched, then the function returns a "FlexInStream", which is the record that keeps details of the created stream (such as the stream ID used to identify it in the radio).
startInStreamHandler forks a new Haskell thread to handle incoming packets from either the standard port (on which multiple streams are multiplexed) or the 'private' port.
If the common VRT port is used, then startInStreamHandler will also create a queue onto which the shared standard port listener for the VRT port will demux and dispatch packets of the stream. Haskell's software transactional memory (STM) package has a TQueue data type that works very nicely for this, whose 'read' function blocks if there are no elements in the queue, providing the synchronisation needed to do work as soon as data (i.e. a new packet) is available.
Although it looks like a 'packet handler' would lack context and therefore be unable to do anything useful, Haskell's closures provide all the context you need for arbitrary context depending on purpose. The audio stream decoder featured in my prior post has now been factored into a PacketHandler, consisting of a literal packet handling function and a cleanup function:
-- | A handler for processing of an audio stream
audioPacketHandler :: IO PacketHandler
audioPacketHandler = do
!samples <- newEmptyMVar
startStreamEngine 24000 1000 1 samples
let
audioPacketHandler' :: VRTIFPacket -> IO ()
audioPacketHandler' packet =
case packet of
VRTIFDataPacket{..} -> do
-- Current stream payload is alternate L-R stereo pairs of IEEE-754 32-bit floating point samples.
-- These are delivered at 24000 L-R samples/s
-- We need to read Word32 samples from the packet, convert these to a float, average them and then
-- convert these again to the dynamic range of an Int16 for sending to OpenAL
let decodeStereoPairsToMonoInt16 :: Get Int16
decodeStereoPairsToMonoInt16 = do
left <- getWord32be >>= return . wordToFloat
right <- getWord32be >>= return . wordToFloat
let mono = round $ (left + right / 2) * fromIntegral (maxBound :: Int16)
return mono
allPairs = whileM (fmap not isEmpty) decodeStereoPairsToMonoInt16
(!result, _) = runGet allPairs vd_data
case result of
Left !err -> putStrLn $ "Error decoding audio data: " ++ err
Right !monoSamples -> putMVar samples monoSamples -- Send to audio engine
VRTIFContextPacket{..} ->
-- Error, we are not expecting a context packet here
errorM logComms $ "Context packet not expected in audio stream"
audioCleanup :: IO ()
audioCleanup =
-- Shut down the audio engine
putMVar samples [] -- Send empty samples list to engine
The function returns an action (signified by the IO) which contains the twin aspects of handling a single packet at a time and also cleaning up (the audioPacketHandler' and audioCleanup respectively). Both of these functions are defined in the context of the outer audioPacketHandler scope, which runs a sequence of actions to set up context before returning the PacketHandler action. In this case, these actions initialise the audio engine with its synchonisation object for sending samples. This is part of the closure of both PacketHandler functions, allowing them to utilise the context and tear it down properly when finished in the cleanup function.
The queue handler code, called by startInStreamHandler in the case that the stream is to be delivered on the shared standard VRT port does the following:
-- | Run a packet handler on a queue
runQueueHandler :: FlexRadioConnection -> FlexHex -> PacketHandler -> TQueue VRTIFPacket -> IO ThreadId
runQueueHandler connection radioStreamID PacketHandler{..} queue = do
-- The termination action
let onClose = do
noticeM logComms $ "Terminating handler for stream " ++ show radioStreamID
-- Terminate the stream in the radio
terminateStream connection radioStreamID
-- Perform packet handler cleanup
ph_cleanup
-- Launch handler
(flip forkFinally) (\_ -> onClose) $ forever $ do
!packet <- atomically $ readTQueue queue
ph_handler packet
The handler uses 'forkFinally' to launch a 'forever' action that will continue to get an item from the packet queue, whenever one becomes available, blocking otherwise. The packet handler is asked to deal with each packet when one arrives. An onClose function is defined which will terminate the stream if anything happens to it that causes this concurrent routine to terminate. This function calls terminateStream which does common termination actions, such as removing the stream record from the connection record, then gets the packet handler to clean itself up.
Besides this improvement to the queue handling, my attention has turned to the UI, per my comments in the last post.
I have again surveyed the Haskell GUI-binding landscape and on balance I have decided to stick with gtk, the one cross-platform toolkit I have used before. I had wondered about wxWidgets (and the wxHaskell binding), but the Haskell binding hasn't been updated for a while. Conversely, a gtk3 binding has literally just been uploaded to the Haskell package repository Hackage. Not only does that bring Haskell bang-up-to-date with the gtk framework, but there is also an additional "gtk3-mac-integration" package that will make the UI work well on a Mac by using the Quartz backend for gtk3.
So, having decided to stick with gtk I had to build the underlying gtk libraries on which the Haskell bindings are based. I had previously built gtk2, so the usual build adventure lay ahead on me.
Indeed, things were a bit awkward to get going and it took a few hours to get through all the gotchas.
In the end, this is what is required:
- Get ready for building the Haskell binding:
cabal install gtk2hs-buildtools - Install the gtk+3 base with homebrew:
brew install gtk+3
This will finish by printing a caveat about dbus, which must be followed now or later - Ensure you have libxml2 with the python bindings (I had it without, so...)
brew reinstall libxml2 --with-python
This will finish by printing a caveat about setting the PYTHONPATH, which must be used - Ensure you have gtk-doc installed
brew install gtk-doc - In order to build the Haskell bindings, you must make config visible:
export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig - Unfortunately on Mavericks there are issues with clang, so gcc must be obtained:brew tap homebrew/versionsbrew install gcc48
- Cairo can be tricky, so better try the Haskell binding for it first:
cabal install cairo --with-gcc=gcc-4.8 - OK, now we can compile the read of the GTK 3 binding:
cabal install gtk3 --with-gcc=gcc-4.8 - Now we'll need the latest glade UI builder. So, download the latest glade sources and unpack the tarball. Glade is tricky to build clean. You have to do the following just to get configured:brew shexport PKG_CONFIG_PATH=/usr/local/lib/pkgconfigexport CC=gcc-4.8export PYTHONPATH=/usr/local/opt/libxml2/lib/python2.7/site-packages:$PYTHONPATH./configure
The 'brew sh' creates a shell with all the homebrew environment included.
We set gcc-4.8 as the compiler to use.
We set the PYTHONPATH, per the caveat from homebrew (see above). - Now we can actually compile and install glade:makesudo make install
- Now you have to do the LaunchAgent configuration per the other homebrew caveat, if you haven't done this already (i.e. copy the agent plist, register it and start it).
- Even now, you are not done. Glade will not yet run because it doesn't have compiled "schemas":
glib-compile-schmas /usr/local/share/glib-2.0/schemas - OK, maybe 13 is lucky for some. We can now finally run glade:
glade - \o/
Ugh. So I'm now ready to start hacking on the philam UI!
No comments:
Post a Comment