[…] it uses a binary heap to represent its task queue, so the cost to schedule a task is O(log n), where n is the number of concurrently scheduled tasks.
The original post explains the idea and motivation behind it, together with examples of how it was used.
As an exercise, I decided to implement it, but with a small twist: instead of using blocking operations (
Lock, etc), I tried to make it a non-blocking algorithm. There are several ways to accomplish that, but I went for compare-and-swap operations, or CAS.
The result can be found in this gist.
The general idea is that your timer is backed by an indexed sequence, for instance an Array. Scheduled task are mapped to different indexes based on their deadline, and each index represents a time frame, for instance tasks to be executed in 50 ms.
In my implementation, there are two methods that are used to make it work:
add, as the name states, allows scheduling tasks. Each task has a function to be executed and a long representing in how many milliseconds that function should be executed. This method is thread safe;
submitCurrentTasksruns on a separated thread, sleeping between intervals and running on a thread pool tasks that are due to execution.
When you add a task, the first step is calculating on which slot of the array should the task be placed. If the slot is the same as the last one executed, the new task is executed right away. If not, it is placed on the appropriate slot.
By incrementing the AtomicInteger of the chosen slot, we are, let’s say, “signaling” the thread running
submitCurrentTasks that a new task will be added to that slot. If the number returned is smaller than the defined maximum, we place the task on the appropriate index. Otherwise, we rollback and decrement the AtomicInteger.
On the thread running
submitCurrentTasks, once we are awaken, we move our cursor to the next slot and copy the tasks on that slot to a secondary list. In order to know how many tasks there are, we use the same AtomicInteger that was used by
add. As we copy the tasks, we check that they are really there through a null check, and set it to null to mark that it was copied. In the meanwhile, if no items were added to that slot, the number of tasks on the secondary list will be the same as the value on the AtomicInteger. If that’s the case, the compare-and-set operation will be successful and we will be able to run the tasks. Otherwise, it will fail and we will repeat the steps until the values match.
Of course, this approach has a lot of drawbacks and only works for very particular cases, as explained on the original article. But this implementation has its own special behavior.
If you try to add a new task, and the cursor moves between the time you calculate the task’s slot and the time you finally place the task on the slot, you might end up missing its right execution deadline, and instead will take a whole cycle to execute it. To give an example, let’s say that the current cursor is 0, you add a task and calculates that it should go to slot 1, and before you place it in the slot, the scheduler thread moves the cursor to 1 and submits all tasks.
On the other hand, it won’t cause a delay of execution. If you had a blocking algorithm, all the tasks on slot 1 would have their execution delayed until you finally placed the new task on its slot. Furthermore, (and I haven’t benchmarked that yet), non-locking algorithms tend to be faster than their locking counterparts, as they cause less context switches.
But to be honest, parallelism is a tricky world which I know very little, and I’m not entirely sure if this implementation would always work… The question here is: given a task is added to its backing array, will the scheduler thread see those changes?
For instance, when you have a variable that is modified by different threads in Java, you might use the volatile keyword to let the compiler know about that. volatile makes reads and writes to that variable to pass through “main memory”, instead of the thread cache, meaning that changes are always visible between threads.
Update: the Scala file can also be obtained here.
- Services Version Lock with Docker and Jenkins
- Tips for your Distributed Project Inception or Meeting
- Trunk Based Development with Multiple Services
- RFID, Dryers, and IoT
- Problems with Branches per Environment
- Circuit Breakers and retries in Scala with autobreaker
- Integration testing for nginx Routes
- O(1) LFU Cache in Scala
- Automatically Build and Deploy the Blog