It’s common knowledge that loose coupling within an application leads to lower- maintenance systems and fewer bugs. To that end there have been numerous techniques and paradigms developed to achieve the loosely-coupled goal.
One such design pattern is the Observer (also called Publish-Subscribe in some circles). In its usual case, it defines a one-to-many dependency between objects, so when the ONE is modified, the MANY are notified.
When an Observable (also known as a Subject, Publisher or Provider) has changed, it notifies Observers that have indicated they are interested in said change by subscribing to the stream of data events published by the Observable.
It’s a useful pattern and Magik has a couple ways to make use of it. There’s the built-in changed/note_change mechanism as well as the databus.
However, although these mechanisms serve to implement a fairly loose coupling, they are still not as flexible as they could be. changed/note_change is tied to specific classes (which means at least one of the classes have to know about the other) and the databus is too tightly coupled, via data, to the communicating classes (its an abstraction that’s one level too low). Further, once set up for explicit data types, it’s not easy to make changes without opening the source code (and potentially breaking subscribers if the publisher changes data types on its end).
What we really need is a go-between that will abstract the Observable in a way that is flexible, can be configured on the fly via procedures and is able to not only notify Observers of changes, but allow chains of functionality to be invoked in order to handle things such as data transformation and filtering, among others.
So let’s see how something like that would work. I’ve written a module called beeble_observable that implements methods to abstract the Observer pattern.
Let’s use it to implement a very simple example.
Suppose we have a procedure that receives a stock’s price change. We can code it as follows.
_global updt_price<< _proc @updt_price(p_price) _return _self.emit(:price_update, p_price) _endproc
Obviously an actual implementation would have more going on in this procedure (such as associating a ticker symbol), but for our purposes, this implementation will suffice.
Notice how we use the emit method (line 4) to publish changes – it’s basically a wrapper around the built-in Magik changed method but returns the value passed in, so we can return the value directly without needing a temporary variable.
Next we’ll define a procedure to calculate a running average of prices.
_global average << _proc @average() _local l_num_in_avg << 0 _local l_avg << 0 _constant calc_avg << _proc @calc_avg(p_value) _import l_num_in_avg _import l_avg write("Average: ", l_avg) l_avg << (l_num_in_avg * l_avg + p_value) / (l_num_in_avg + 1) l_num_in_avg +<< 1 write("Num in Avg: ", l_num_in_avg) write("New Average: ", l_avg) _endproc _return _proc @new_avg(p_value) _import calc_avg calc_avg(p_value) _endproc _endproc
If you’ve read my article on Closures you’ll recognize a closure is being used to create a private procedure (line 7) and private variables (lines 4 and 5).
That’s all we need for now, so let’s hook them up.
Magik> o << beeble_observable.new(updt_price) a beeble_observable Magik> avg_price << average() Defining global avg_price proc new_avg(p_value) Magik> o.subscribe(_proc(e) avg_price(e.data) _endproc) a beeble_observable Magik> updt_price(200) Average: 0 Num in Avg: 1 New Average: 200 200 Magik> updt_price(100) Average: 200 Num in Avg: 2 New Average: 150 100 Magik> updt_price(600) Average: 150 Num in Avg: 3 New Average: 300 600 Magik>
See how cool that is? Not quite yet? No problem… let’s run through it.
In line 1 we create a new observable based on the updt_price procedure. Then in line 4 we return a procedure with its associated lexical environment (i.e. a closure).
In line 8 we subscribe to the observable and pass in an unnamed procedure that uses avg_price (from line 4) to calculate the current average. The unnamed procedure, passed to subscribe, receives a beeble_event object when it’s invoked that contains data accessible via e.data in line 8.
Subscribing to the observable, and passing in a procedure, activates the observer/observable mechanism. Now each time the observable changes (and publishes the change via the emit method), the observer procedure (in this case the unnamed procedure) is invoked.
So when we invoke updt_price (line 11), we see avg_price is automatically run (with data returned from invoking updt_price). This allows us to implement reactive programming so our applications react to changes published elsewhere. The fact we use procedures as both observable and observer gives us great flexibility in how we can configure behaviour.
We can also add another observer to updt_price.
_global updt_portfolio<< _proc @updt_portfolio(p_price) _local l_num_shares_owned << 10000 _local l_portfolio_value << l_num_shares_owned * p_price write("Portfolio Value:") l_portfolio_value.write_fixed_point(!terminal!, 2) write(newline_char) _return _self.emit(:portfolio_update, l_portfolio_value) _endproc
This procedure updates a mythical portfolio containing 10,000 shares. Let’s create another observable and subscribe.
Magik> o1 << beeble_observable.new(updt_price) a beeble_observable Magik> o1.subscribe(_proc(e) updt_portfolio(e.data) _endproc) a beeble_observable Magik> updt_price(125) Portfolio Value: 1250000.00 Average: 300 Num in Avg: 4 New Average: 1025/4 125 Magik>
See how updt_portfolio (line 8) is also run (in addition to avg_price in line 11) when we invoke updt_price? We could create as many observers as we like and they would all be notified when the observable (updt_price) was invoked. Each observer could perform a different behaviour, customized by the procedure passed into subscribe.
If you think about it, you can do all sorts of powerful things using this reactive mechanism. And because observers and observables are procedures, they can be switched out to change behaviour as necessary.
And when we’re done, simply unsubscribe to remove the observers.
Magik> o.unsubscribe() Magik> updt_price(50) Portfolio Value: 500000.00 50 Magik> o1.unsubscribe() Magik> updt_price(75) 75 Magik>
See how avg_price was no longer invoked after we unsubscribed in line 1? And notice how updt_portfolio stopped running after unsubscribing in line 9?
Pretty groovy eh?
But that’s not all… suppose we want to modify the value returned from the observable before it’s passed to the function in subscribe?
Wouldn’t that be useful? Sure it would. We’d be able to transform and filter the value in myriad ways. Come to think of it, we could use the pipe procedure we developed in the Magik Curry article to pipeline additional procedures.
So let’s do that.
Adding a Pipeline
But first I’m going to define another procedure.
_global show_recommendation << _proc @show_recommendation(p_portfolio_value) _constant SELL_THRESHOLD << 200000 _constant BUY_THRESHOLD << 100000 _local l_recommendation << "HOLD Shares" _if p_portfolio_value > SELL_THRESHOLD _then _local l_sale_value << p_portfolio_value - SELL_THRESHOLD l_recommendation << write_string("SELL $", l_sale_value, " worth of shares") _elif p_portfolio_value < BUY_THRESHOLD _then _local l_buy_value << BUY_THRESHOLD - p_portfolio_value l_recommendation << write_string("BUY $", l_buy_value, " worth of shares") _endif _return l_recommendation _endproc
This one simply looks at some hard-coded thresholds and returns a BUY, SELL or HOLD recommendation (by the way, this is a contrived example… so please don’t use it for actual stock trading).
Now let’s create an observable for updt_price again, but this time define a pipeline of procedures (in line 4).
Magik> o << beeble_observable.new(updt_price) a beeble_observable Magik> o.pipe(updt_portfolio, show_recommendation).subscribe(_proc(e) write(e.data) _endproc) a beeble_observable Magik> updt_price(100) Portfolio Value: 1000000.00 SELL $8.000e+005 worth of shares 100 Magik> updt_price(5) Portfolio Value: 50000.00 BUY $5.000e+004 worth of shares 5 Magik> updt_price(15) Portfolio Value: 150000.00 HOLD Shares 15 Magik>
Notice how we reused updt_portfolio in the pipeline (whereas previously we had used it in subscribe).
Now when we invoke updt_price we see the portfolio value as well as the recommendation because the value coming out of updt_price is fed into, first, updt_portfolio and then show_recommendation in the pipeline (line 4).
Once the pipeline has completed, the procedure passed to subscribe is invoked (in this case it simply writes out the event’s data — which is the output from show_recommendation because that was the last procedure in the pipeline to act on the chain of values).
But hold on just a minute! What if updt_price was now being passed U.S. dollars while our portfolio was denoted in Canadian dollars? That would create a bit of a mess… unless we did this…
_global convert_us_to_cdn << _proc @convert_us_to_cdn(p_us_dollar_value) _return p_us_dollar_value * 1.32 _endproc
And then this…
Magik> o.pipe(convert_us_to_cdn, updt_portfolio, show_recommendation).subscribe(_proc(e) write(e.data) _endproc) a beeble_observable Magik> updt_price(9) Portfolio Value: 118800.0 HOLD Shares 9 Magik>
I’ll leave it to you to confirm that $90,000 U.S. is $118,800 CDN given a 1.32 exchange rate (hint: it is).
So simply by adding the U.S. to Canadian dollar conversion procedure to the pipeline, in line 1, we’ve fixed the problem. Very simple and very readable: “convert U.S. dollars to Canadian dollars, update the portfolio then show the recommendation.” It’s like reading an English sentence.
A Better More Flexible Paradigm
Of course you can add as many procedures as necessary to the pipeline, thereby allowing you to write software that is reusable, configurable, reactive and asynchronous.
The possibilities are limited only by your creativity and imagination. Plus, standard operators (i.e. built-in procedures, such as map and filter) can be used in the pipeline to create an endless variety of configurations, thus allowing your program to process data in all sorts of different ways.
And finally, look at that pipeline again. It’s easy to understand what’s happening (because it uses a functional programming, declarative style) and it’s flexible enough to reconfigure simply by adding, removing or substituting procedures.
Further, if we change the implementation of any of these procedures (and with all those hard-coded values in the examples, we’d better be doing just that — retrieving data from a database and REST or GraphQL APIs), as long as the signatures remain the same, we wouldn’t have to change anything in the pipeline because the underlying implementation details are hidden within the procedures.
Once you’ve become familiar with the functional style of development and embraced reactive programming, your applications will be far easier to understand and debug, and that can lead to reduced maintenance and enhancement costs as well as a lower total cost of ownership over the life of your application.