Skip to content

Creating Observables in Scala #549

Closed
@samuelgruetter

Description

@samuelgruetter

The constructors for Observables in Scala have multiple problems. I'd like to start a systematic discussion about this, hoping to find a good solution.

First, a list of use cases that we eventually want to cover:

  • usecase01: "Observable.create, the mother of all factory methods": construct from an OnSubscribeFunc
  • usecase02: an empty Observable
  • usecase03: emit only 1 given element
  • usecase04: emit a given (vararg)list of elements
  • usecase05: emit all elements of an Iterable
  • usecase06: emit all elements of a possibly infinite Iterable
  • usecase07: emit the single item returned by a Future
  • usecase08: wrap a Java Observable (Scala only)
  • usecase09: emit only a given exception in onError
  • usecase10: emit a range of integers
  • usecase11: emit one asynchronously calculated element (a shorthand for constructing a Future and then applying usecase07

Note

  • Depending on the approach, one constructor might cover several usecases, since I tried to split them up as much as possible.
  • This list only contains "very important" constructors, which are eligible to be called Observable.apply. Others, which have names everyone agrees on, such as defer, never, interval, generate, etc, are not listed.

Now a first approach01 would be to use the same names as Java does:

/*usecase01:*/ public static <T> Observable<T> create(OnSubscribeFunc<T> func)
/*usecase02:*/ public static <T> Observable<T> empty()
/*usecase03:*/ public static <T> Observable<T> just(T value)
/*usecase04:*/ public static <T> Observable<T> from(T t1, ... T tN) /* 10 overloads */
/*usecase05:*/ public static <T> Observable<T> from(Iterable<? extends T> iterable)
/*usecase06:*/ /* not yet implemented */
/*usecase07:*/ public static <T> Observable<T> from(Future<? extends T> future)
/*usecase08:*/ /* NA */
/*usecase09:*/ public static <T> Observable<T> error(Throwable exception)
/*usecase10:*/ public static Observable<Integer> range(int start, int count)
/*usecase11:*/ /* not yet implemented */

We decided against this because this has problem01: Observable.apply is not used, so we don't exploit a nice feature of Scala.

That's why we implemented this approach02 (that's version 0.15.1):

/*usecase01:*/ def apply[T](func: Observer[T] => Subscription): Observable[T]
/*usecase02:*/ /* special case of usecase04 */
/*usecase03:*/ /* special case of usecase04 */
/*usecase04:*/ def apply[T](items: T*): Observable[T]
/*usecase05:*/ /* special case of usecase04, example: */ Observable(myList : _*)
/*usecase06:*/ /* not yet implemented, not even in Java */
/*usecase07:*/ /* not yet implemented */
/*usecase08:*/ def apply[T](observable: rx.Observable[_ <: T])
/*usecase09:*/ def apply[T](exception: Throwable): Observable[T]
/*usecase10:*/ def apply(range: Range): Observable[Int]
/*usecase11:*/ /* not yet implemented */

But this also turned out to have problems:

problem02: If I write this:

val o = Observable((observer: Observer[Int]) => { observer.onNext(42) /* no Subscription returned by mistake */ })

Then I don't get an error, but o is an Observable[Observer[Int] => Unit].

problem03: Observable(new Exception, new Exception) yields an Observable[Exception] with 2 elements and is not the same as Observable(new Exception) ++ Observable(new Exception), which yields an Observable[Nothing] with 0 elements, terminating with onError. Coursera students got confused about this.

problem04: The varargs apply and the OnSubscribeFunc apply clash in such a way that parameter type inference is lost:

val o1 = Observable(observer => { Subscription{} }) // Error: missing parameter type, should infer Observer[Nothing]
val o2 = Observable[Int](observer => { observer.onNext(1); Subscription{} }) // Error: missing parameter type, should infer Observer[Int]
val o3: Observable[Int] = Observable(observer => { observer.onNext(1); Subscription{} }) // works

problem05: Cannot easily construct an Observable emitting one Future, one Exception, or one Range.

problem06: It's possible to define both

def apply[T](items: T*): Observable[T] 
def apply[T](items: Iterable[T]): Observable[T]

but when I want to use it (eg Observable(List(1, 2, 3))), I get

ambiguous reference to overloaded definition, both method apply in object Observable of type [T](items: Iterable[T])rx.lang.scala.Observable[T] and method apply in object Observable of type [T](items: T*)rx.lang.scala.Observable[T] match argument types (List[Int])

We could also use implicit conversions, approach03:

  • usecase01: "Observable.create": call it Observable.apply
  • usecase02: List().toObservable
  • usecase03: List(1).toObservable
  • usecase04: List(1, 2, 3).toObservable
  • usecase05: myIterable.toObservable
  • usecase06: myIterable.toObservable
  • usecase07: myFuture.toObservable
  • usecase08: myJavaObservable.toObservable
  • usecase09: Observable.error(new Exception)
  • usecase10: (0 to 4).toObservable
  • usecase11: Observable.async{ ... } or something else

Here, usecases 02, 03, 04, 05, 06, and 10 would all be covered by one single implicit conversion from Iterable[T] to Observable[T].
However, I'm not yet sure if this approach would lead to other problems.

I invite everyone to post new approaches, and to comment on existing ones. And please use increasing unique ids for usecase, approach and problem, to keep our discussion tidy ;-)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions