It’s common knowledge that loose coupling within an application leads to lower- maintenance systems and fewer bugs. To that end there have been numerous techniques and paradigms developed to achieve the loosely-coupled goal.
The Observer Pattern
One such design pattern is the Observer, also called Publish-Subscribe in some circles (let’s break into a small sidebar at this point: technically using Publish-Subscribe isn’t correct because this pattern is usually implemented asynchronously with some sort of middleware connecting multiple applications — and these applications don’t know about one another. The Observer pattern, on the other hand, is generally implemented in a single application where the Provider keeps a record of all subscribed Observers so it can notify them when required. However many people use the Publish-Subscribe terminology to describe the Observer pattern, so keep that in mind — however I will always use the term, “Observer,” when discussing the Observer pattern).
In its usual case, the Observer pattern defines a one-to-many dependency between objects, so when the ONE is modified, the MANY are notified.
When a Publisher (also known as a Subject or Provider) has changed, it notifies Observers that have subscribed to the stream of data events published by the Publisher.
It’s a useful pattern and Smallworld has a couple ways to make use of it. There’s the built-in changed/note_change mechanism as well as the databus.
However, although these mechanisms serve to implement a fairly loose coupling, they are still not as flexible as they could be. changed/note_change is tied to specific objects (which means at least one of the classes have to know about the other) and the databus is too tightly coupled, via data, to the communicating objects (its an abstraction that’s one level too low). Further, once set up for explicit data types, it’s not easy to make changes without opening the source code (and potentially breaking subscribers if the publisher changes data types on its end).
What we really need is a go-between that will abstract the Observer pattern in a way that is flexible, can be configured on the fly via procedures and is able to not only notify Observers of changes, but allow chains of functionality to be invoked in order to handle things such as data transformation and filtering, among others. The Observer is a good start, but we need more.
The Iterator Pattern
An Iterator is an object that allows consumers to iterate through its contents. It hides internal implementation details behind a simple interface consisting of two methods: next() and has_next() which respectively gets the next item in the object’s sequence and indicates whether there are any items left in the sequence.
The Iterator is a pull-based pattern. When a consumer requires the next value, it is responsible for calling the Iterator’s next() method. Contrast this with the Observer pattern that is push-based (so when a Publisher has new data, it pushes the data to its subscribers — which are the consumers).
Whenever we need to traverse a sequence of data items, Iterators come in handy, however, as with Observers, there are limitations.
The Observable
Now if we take a step back and think about what programs do, we note they read, transform and write data. Therefore we can think of data as a stream of values (or in Reactive Extension (Rx) terminology, a stream of events) that arrive over time and are manipulated by our programs.
With that in mind, we can define an Observable to represent a stream of events. In fact, an Observable combines the fundamental concepts of the Observer pattern with the Iterator pattern and does it in a Functional Programming manner.
The Observable packages events that arrive over time and transform them into a stream of values that can be processed as a whole, rather than trying to handle each event as an isolated case. Once we have streams of data, it’s easy to manipulate them by merging, transforming or passing them around inside our programs.
As such, Observables are ideal for handling asynchronous and event-based patterns.
An Observable emits its values in order, much like the Iterator, but uses the Observer pattern’s push concept. As data elements become available, they are pushed to subscribers.
Another attribute of the Observable is that it can signal when the sequence has completed or if an error occurs. In these cases, the Observable automatically unsubscribes from its emitter and no further data can be accessed.
The Observable implements three methods: next(), complete() and fault() — note that fault() is called error() in most implementations, but since Magik objects usually have an error() method, I’ve changed it to minimize confusion.
next() emits a new value to all subscribers. complete() indicates there are no more data elements and unsubscribes. fault() is invoked when an error occurs in the Observable. It also automatically unsubscribes.
beeble_observable
So let’s see how something like this would work. I’ve written a module called beeble_observable that implements methods for the Observable pattern.
Let’s use it to implement a very simple example.
First, we’ll define an observer object. We’ll use a beeble_object and define the next() and complete() methods in lines 6 and 10 respectively. If we omit a method (fault() in this case), a default implementation will be provided.
_block
_global observer_fn
observer_fn << beeble_object.new()
observer_fn.next << _proc @next(val)
write("Observer: ", val)
_endproc
observer_fn.complete << _proc @complete()
write("Observer Completed")
_endproc
_endblock
Right now our methods simply write information to the console, but in an actual application these methods would implement business logic functionality by calling additional functions.
Now that we have an observer, let’s create an observable.
Magik> integers_o << from({1,2,3,10,20,30,42,100,200,1123})
a beeble_observable
Magik> s << integers_o.subscribe(observer_fn)
Observer: 1
Observer: 2
Observer: 3
Observer: 10
Observer: 20
Observer: 30
Observer: 42
Observer: 100
Observer: 200
Observer: 1123
Observer Completed
a beeble_subscriber
Magik>
In line 1 we create an observable using the from() creation function. Creation functions are convenient for making new observables without having to go through the steps of instantiating the beeble_observable class and providing appropriate logic.
Note the convention is to name variables pointing to observables with an “_o” suffix.
Side Note
Something else to note is I’m using from() as a global procedure for demonstration purposes only. Since beeble_observable is part of the MagikFP library, it is available under the MagikFP namespace FP (i.e. FP.rx_magik.fns.from()). However, in order to keep these examples as uncluttered as possible, I’m not going to add the namespace.
In line 4 we subscribe to the observable, passing in our observer function, in order to run it. The act of subscribing starts the observable. In this case, because we used from(), the observable will iterate through the values passed to it and emit each one using next(). Once that’s done, it will emit the complete() notification in order to end.
As you can see, our observer function’s next() method is called (lines 5 to 14) for each emission and finally complete() is called (line 15) once the observable completes.
The subscribe() method returns a beeble_subscriber object — which we capture in the variable s and can be used to unsubscribe (i.e. s.unsubscribe()).
And that, in a nutshell, is an example of creating and using an observable.
So… what do you think?
Can you see how useful and powerful this concept is?
Not yet?
That’s okay… keep reading and we’ll delve into a real-world example to illustrate the power and flexibility of observables.
And speaking of flexibility, let’s look at an example that demonstrates this. First we’ll create a factory that returns a function to keep track of sums (in line 1).
_global sum_factory <<
_proc @sum_factory()
_local sum << 0
_return _proc @sum(p_value)
_import sum
sum +<< p_value
_return sum
_endproc
_endproc
_global a_summer << sum_factory()
_global sum_observer
sum_observer << beeble_object.new()
sum_observer.next << _proc @next(val)
_import a_summer
write("Adding: ", val)
a_summer(val)
_endproc
sum_observer.complete << _proc @complete()
_import a_summer
write("Sum equals: ", a_summer(0))
_endproc
sum_observer.fault << _proc @fault(err)
write("Fault: ", err)
_endproc
The factory makes use of a closure to maintain state.
In line 16 we define a new observer object and create next(), complete() and fault() methods.
Note how we import the a_summer() function, created from the sum_factory() in line 12, and use it in the next() and complete() methods.
Now let’s subscribe to our observable using the new observer function…
Magik> s << integers_o.subscribe(sum_observer)
Adding: 1
Adding: 2
Adding: 3
Adding: 10
Adding: 20
Adding: 30
Adding: 42
Adding: 100
Adding: 200
Adding: 1123
Sum equals: 1531
a beeble_subscriber
Magik>
See how we changed the functionality of the observable simply by providing the new observer function in line 1? So that’s pretty impressive. But there’s more…
We can also transform the stream of emissions using a pipe.
First we’ll define a predicate function.
_global even_pred <<
_proc @even_pred(p_val)
_return p_val _mod 2 = 0
_endproc
Now we can use this predicate in the pipe, via the filter operator, to work only with even numbers. Operators are just pure functions that take an observable as a parameter, run some functionality and return a new observable.
Magik> s << integers_o.pipe(filter(even_pred)).subscribe(sum_observer)
Adding: 2
Adding: 10
Adding: 20
Adding: 30
Adding: 42
Adding: 100
Adding: 200
Sum equals: 404
a beeble_subscriber
Magik>
And just like that we’ve filtered out all the odd values before they reached the sum_observer() function — so sum_observer now adds only the even numbers.
But what if we just wanted to add the first 3 numbers?
Magik> s << integers_o.pipe(filter(even_pred), take(3)).subscribe(sum_observer)
Adding: 2
Adding: 10
Adding: 20
Sum equals: 32
a beeble_subscriber
Magik>
Or what if we wanted to double all the numbers before adding them…
Magik> s << integers_o.pipe(filter(even_pred), take(3), map(_proc(val) >> val * 2 _endproc)).subscribe(sum_observer)
Adding: 4
Adding: 20
Adding: 40
Sum equals: 64
a beeble_subscriber
Magik>
As you can see, we can achieve quite different results using built-in operators such as filter, map and take (along with simple functions) to manipulate the output from observables.
A Real World Example
Hopefully you’ve understood the examples above, but if not, go back and read through them again until you get it. You’ll need that knowledge to understand the next example.
This time we’ll set up a local GSS instance hosting a REST API endpoint that returns various objects from the Cambridge database. It’s beyond the scope of this article to explain that part, but if you’d like the details, check out the article on running GSS locally and this one about writing better GSS services.
As before, we’ll create a new observable — but this time we’ll use another creation function (from_rest_api()) designed to make REST Service calls.
Magik> rest_o << from_rest_api("http://localhost:7771/cambridge-api/all-objects")
a beeble_observable
Magik>
Note we’ve provided the URI to the endpoint (in this case it’s running on my local machine). This will wait until we subscribe then make a request to the REST endpoint and emit each object that’s returned. We handle each emission in our observer’s next() method.
Here’s the observer object and methods…
_global camb_observer_fn
camb_observer_fn << beeble_object.new()
camb_observer_fn.next << _proc @next(val)
print(val)
write(newline_char)
_endproc
camb_observer_fn.complete << _proc @complete()
write("Observer Completed")
_endproc
We run the observable by subscribing to it with camb_observer_fn…
Magik> rest_o.subscribe(camb_observer_fn)
{"type":"Guest House","address2":"Cambridge","comments":"unset","address1":"154 Chesterton Road","name":"Acorn Guest House"}
{"type":"Guest House","address2":"Cambridge","comments":"unset","address1":"219 Chesterton Road","name":"All Seasons"}
{"type":"Hotel","address2":"Cambridge","comments":"","address1":"Chesterton Road","name":"Arundel House"}
{"type":"Hotel","address2":"Cambridge","comments":"","address1":"74 Chesterton Road","name":"Ashley Hotel"}
{"type":"Guest House","address2":"Cambridge","comments":"","address1":"17 Elizabeth Way","name":"Cam Guest House"}
{"type":"Guest House","address2":"Cambridge","comments":"unset","address1":"166 Chesterton Road","name":"De Freville House"}
.
.
.
Observer Completed
a beeble_subscriber
Magik>
…and our observer prints each object returned from the endpoint and writes the completed message once the observable completes.
But there are 74 objects returned (I truncated the rest). What if we only wanted to see the first 4 objects. Remember how we did that?
Yep… we use a pipe and the take operator.
So let’s do that.
Magik> rest_o.pipe(take(4)).subscribe(camb_observer_fn)
{"type":"Guest House","address2":"Cambridge","comments":"unset","address1":"154 Chesterton Road","name":"Acorn Guest House"}
{"type":"Guest House","address2":"Cambridge","comments":"unset","address1":"219 Chesterton Road","name":"All Seasons"}
{"type":"Hotel","address2":"Cambridge","comments":"","address1":"Chesterton Road","name":"Arundel House"}
{"type":"Hotel","address2":"Cambridge","comments":"","address1":"74 Chesterton Road","name":"Ashley Hotel"}
Observer Completed
a beeble_subscriber
Magik>
Now let’s clean up the returned objects. First, some comments have the value, “unset”. We’ll convert all, “unset” values to empty strings. Here’s the function to do that…
_global unset_to_empty_string <<
_proc @unset_to_empty_string(p_obj)
_constant NEW_OBJ << p_obj.copy()
_if (nm << NEW_OBJ.comments) _is _unset _orif nm = "unset"
_then
NEW_OBJ.make_mutable(:comments)
NEW_OBJ.comments << ""
_endif
_return NEW_OBJ
_endproc
Then we use the map operator in the pipe and, just like that, we’ve cleaned up unsets in the comments.
Magik> rest_o.pipe(take(4), map(unset_to_empty_string)).subscribe(camb_observer_fn)
{"type":"Guest House","address2":"Cambridge","name":"Acorn Guest House","comments":"","address1":"154 Chesterton Road"}
{"type":"Guest House","address2":"Cambridge","name":"All Seasons","comments":"","address1":"219 Chesterton Road"}
{"type":"Hotel","address2":"Cambridge","name":"Arundel House","comments":"","address1":"Chesterton Road"}
{"type":"Hotel","address2":"Cambridge","name":"Ashley Hotel","comments":"","address1":"74 Chesterton Road"}
Observer Completed
a beeble_subscriber
Magik>
Now let’s transform names to be all uppercase. As before, we define a function…
_global uppercase_name <<
_proc @uppercase_name(p_obj)
_constant NEW_OBJ << p_obj.copy()
_if (nm << NEW_OBJ.name) _isnt _unset _andif nm <> "unset"
_then
NEW_OBJ.make_mutable(:name)
NEW_OBJ.name << p_obj.name.uppercase
_endif
_return NEW_OBJ
_endproc
…and use it with map in a pipe…
Magik> rest_o.pipe(take(4), map(unset_to_empty_string), map(uppercase_name)).subscribe(camb_observer_fn)
{"type":"Guest House","address2":"Cambridge","name":"ACORN GUEST HOUSE","comments":"","address1":"154 Chesterton Road"}
{"type":"Guest House","address2":"Cambridge","name":"ALL SEASONS","comments":"","address1":"219 Chesterton Road"}
{"type":"Hotel","address2":"Cambridge","name":"ARUNDEL HOUSE","comments":"","address1":"Chesterton Road"}
{"type":"Hotel","address2":"Cambridge","name":"ASHLEY HOTEL","comments":"","address1":"74 Chesterton Road"}
Observer Completed
a beeble_subscriber
Magik>
Finally, we’ll add a timestamp to each received object. Here’s the function…
_global add_timestamp <<
_proc @add_timestamp(p_obj)
_constant NEW_OBJ << p_obj.copy()
NEW_OBJ.timestamp << date_time.now()
_return NEW_OBJ
_endproc
And here’s how we add it to the pipe…
Magik> rest_o.pipe(take(4), map(unset_to_empty_string), map(uppercase_name), map(add_timestamp)).subscribe(camb_observer_fn)
{"type":"Guest House","address2":"Cambridge","comments":"","address1":"154 Chesterton Road","name":"ACORN GUEST HOUSE","timestamp":"14/01/2020 07:08:49"}
{"type":"Guest House","address2":"Cambridge","comments":"","address1":"219 Chesterton Road","name":"ALL SEASONS","timestamp":"14/01/2020 07:08:49"}
{"type":"Hotel","address2":"Cambridge","comments":"","address1":"Chesterton Road","name":"ARUNDEL HOUSE","timestamp":"14/01/2020 07:08:49"}
{"type":"Hotel","address2":"Cambridge","comments":"","address1":"74 Chesterton Road","name":"ASHLEY HOTEL","timestamp":"14/01/2020 07:08:49"}
Observer Completed
a beeble_subscriber
Magik>
I hope these examples have demonstrated how observables make it very easy to handle asynchronous events (such as REST Service calls) and allow us to manipulate and transform returned values in myriad ways.
There are a number of useful creation functions and even more operators. And since Observer functions let us react to events without having to continuously poll for them or create tons of nested callbacks, our code becomes cleaner and easier to understand.
A Better More Flexible Paradigm
Of course you can add as many standard operators (e.g. built-in procedures such as map and filter) as necessary to the pipeline, thereby allowing you to write software that is reusable, configurable, reactive and asynchronous.
The possibilities are limited only by your creativity and imagination.
Plus, if a you need functionality not provided by a standard operator (which probably won’t be the case), you can create your own — because operators are simply pure functions that receive an observable and return an observable.
This ability to combine standard operators with custom-written ones in the pipeline lets you create an endless variety of configurations, thus allowing your program to process data in all sorts of different ways.
Now look at that pipeline again…
map(unset_to_empty_string), map(uppercase_name), map(add_timestamp)
It’s easy to understand what’s happening (because it uses a functional programming, declarative style) and it’s flexible enough to reconfigure simply by adding, removing or substituting operators and procedures.
Further, if we change the implementation of any of these procedures, as long as the signatures remain the same, we wouldn’t have to change anything in the pipeline because the underlying implementation details are hidden within the procedures.
What I’ve just described only scratches the surface of what’s possible with Observables. You can use them as an enhanced Observer (which is powerful in its own right), but the pinnacle of power comes from the operators we can use to manipulate event streams and the fact Observables can also be used as sources for other observables. I’ll go into more detail in an upcoming article, but for now, if you understand the concepts in this article, you’ll have a strong foundation upon which to build.
Here’s a marble diagram from the ReactiveX site that hints at the true power of Observables.

Once you’ve become familiar with the functional style of development and embraced reactive programming, your applications will be far easier to understand and debug, and that can lead to reduced maintenance and enhancement costs as well as a lower total cost of ownership over the life of your application.