We will pass in println as the function to call when we have the value:(take! simplechan println); 1(take! simplechan println); 2(take! simplechan println); 3(take! simplechan println); nil(take! simplechan println); nilThe key thing to remember is that both take and put are sending/receiving one item at a time.
How much data can we put on the channel?; Create a new channel again so that we know it is empty(def simplechan (chan)); What happens if we try and call it 1024 times?(dotimes [i 1024] (put! simplechan i)); nil – Seems to be working ok; What if we try adding on one more value?(put! simplechan "Exceeded Capacity"); Execution error (AssertionError) at Clojure.
core.
async.
impl.
channels.
ManyToManyChannel/put_BANG_ (channels.
clj:152).
;Assert failed: No more than 1024 pending puts are allowed on a single channel.
Consider using a windowed buffer.
(< (.
size puts) impl/MAX-QUEUE-SIZE)It looks like Clojure can only handle 1024 items on our simple channel before it topples over.
This is because our channel currently has no buffer, so it can only contain 1024 items before it’s input queue is full.
We should perhaps try using a different type of channel.
What if we don’t really need to keep 100% of the data going into the channel?Dropping Buffer ChannelsWe will create a dropping buffer, which will drop values if the channel buffer receives too many values.
What if we set the buffer to 2,000 as well, which is way past the 1024 items we had last time?(def droppingchan2 (chan (a/dropping-buffer 2000))); What happens if we try and call it 20,000 times?(dotimes [i 20000] (put! droppingchan2 i)); nil (nothing fell over)(take! droppingchan println); 0It seemed to have solved our crashing problem and we now have up to 2000 items in our channel buffer at any given moment.
To answer why this doesn’t crash, we should examine how the dropping buffer actually works.
The channel we created has an internal buffer which is 2,000 items long.
If we add more than 2,000 items onto the buffer at any point in time, then the newest values are simply dropped until more space in our buffer is made available.
Anatomy of a Core.
async queue from Rich Hickey’s Channel talkInternally, a core.
async channel actually consists of three queues.
There is the buffer, which is how many items can remain on the channel at any given time, a put queue for adding items onto the channel, and a take queue for taking items off the channel.
The assertion failed error message happens when we fill up either the put queue or the take queue with more than 1024 call handlers.
In our previous example, our unbuffered channel had no internal buffer, and so any put requests were added into the put buffer, which has a hard limit of 1024 items.
(def droppingchan (chan (a/dropping-buffer 2000))); Write 20,000 items of data to the channel(dotimes [i 20000] (put! droppingchan i)); nil – all seemed ok when adding to the buffer; What if we try overloading the take queue with 20,000 requests?(dotimes [i 20000] (take! droppingchan println)); Assert failed: No more than 1024 pending takes are allowed on a single channel.
(< (.
size takes) impl/MAX-QUEUE-SIZE)What happened?Well our dropping buffer channel was working quite happily when we bombarded it with data.
It simply filled the 2,000 item buffer and then started dropping data.
However, if we tried to read from it 20,000 times we run into the assertion failed method.
The main reason for this is that reading a value from the channel has a bit of overhead, so the take requests stack up until we hit that 1024 request hard limit.
At this point, we receive the assert failed message.
Hmmn, what about if we really bombarded our channel, could we overload it with put requests too?(def droppingchan (chan (a/dropping-buffer 2000))); Write 5,000,000 items of data to the channel!(dotimes [i 5000000] (put! droppingchan i)); nil – Our put's succeed but only the first 2000 items are put; The 2001 – 5,000,000th item never makes the channel!The answer is no.
It looks like the dropping buffer is doing its job and handling our 5,000,000 requests without falling over, and by handling, I mean it doesn’t crash regardless of how many times we put to it.
Our dropping buffer simply maintains a buffer of 2,000 items and drops any remaining data instead of putting it onto the channel, i.
e.
it only writes data to the channel when there is space in the buffer.
Sliding-Buffer ChannelsWhat if we were only more interested in the most recent items instead?.Well we can use a sliding-buffer for this.
(def slidingchan (chan (a/sliding-buffer 2000))); Write 5,000,000 items of data to the channel!(dotimes [i 5000000] (put! slidingchan i)); nil – We successfully wrote 5,000,000 items(take! slidingchan println); 4998000Again, similar to our dropping-buffer, we can try and put to it as many times as we like without an error.
The channel maintains a sliding buffer of 2,000 items.
So when we call take!.we get the oldest item first, which in this case is the 4,998,000th item.
However, if we called take!.5,000,000 times then we would get another AssertionError, as we would soon fill up the 1024 hard-limit for requests in the take queue.
The key thing to remember with the operation of both the dropping-buffer and sliding-buffer is that they will fill the buffer and drop packets when bombarded with put requests.
The sliding buffer maintains a queue of the most recent n items, whilst the dropping buffer maintains a queue of the first n items until more space is made available on the queue (via a take operation).
These are two helpful mechanisms that help consumers deal with onslaughts of data from producers.
Finally, we can also close our buffers:(close! simplebuffer)Further Reading…Core.
async channels are great.
We have only covered the basics here, but you can do all sorts of fun things with them.
You can mix multiple channels together and create pipelines, or distribute processing across multiple channels in parallel using transducers.
To learn more about core.
async channels and the philosophy behind them, it is worth watching Rich Hickey’s talk on Clojure core.
async channels.
Clojure core.
asyncRich Hickey, the author of Clojure and designer of Datomic, is a software developer with over 20 years of experience in…www.
infoq.
comAnother really helpful introduction video to the core.
async library can be found below which features Timothy Baldridge explaining how to use the library.
It is well worth a watch if you can spare 40 minutes of your time, but if not then read on as we will be covering some of the basic concepts in this article!If you are interested in working with core.
async in production then I would also highly recommend this video by Zach Tellman, which gives some interesting insight into handling queues.
You can also use Zach’s open-source library, durable-queue to write data to disk as a cache for when a queue is being bombarded with requests.
This will help your systems to recover in the event of a process crash.
Finally, the core.
async reference sheet from Purelyfunctional.
tv is a great resource, as is the official documentation.
I would also recommend Clojure for the Brave and True which offers a fantastic introduction if you are getting started.
.