Skip to content

Scala Adaptor #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Oct 9, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8856077
add Documentation section to README
samuelgruetter Sep 25, 2013
cc8b756
move examples from `src/main/scala` to `src/examples/scala`
samuelgruetter Sep 25, 2013
01634bc
move MovieLibUsage.java from project rxjava-scala-java to project rxj…
samuelgruetter Sep 25, 2013
450be12
how to add RxJava core to scaladoc input
samuelgruetter Sep 25, 2013
f20a8eb
make Subscription an implicit value class
samuelgruetter Sep 25, 2013
a5fe8dc
Opening/Closing, Timestamped with unapply, BlockingObservable with Wi…
samuelgruetter Sep 25, 2013
6f56788
remove `implicit` from Timestamped
samuelgruetter Sep 25, 2013
de7ac42
work around scalac bug by renaming companion object `Timestamped` to …
samuelgruetter Sep 25, 2013
14369fc
Timestamped and its companion can now have the same name
samuelgruetter Sep 26, 2013
f7ab0b3
scaladoc for Observer, Subject, Scheduler, and
samuelgruetter Sep 26, 2013
add46e6
put unapply of Notifications into companions
samuelgruetter Sep 26, 2013
f1a3fb2
add rx.lang.scala.concurrency.Schedulers
samuelgruetter Sep 26, 2013
d3f933c
improve scaladoc
samuelgruetter Sep 26, 2013
45d3523
update README and TODO
samuelgruetter Sep 26, 2013
af7d056
javadoc -> scaladoc translation
samuelgruetter Sep 27, 2013
79ce193
start TestScheduler
samuelgruetter Sep 27, 2013
7c4d23a
work on Schedulers
samuelgruetter Sep 27, 2013
dcbae87
Scheduler and TestScheduler
samuelgruetter Sep 27, 2013
aae04c1
add exists and isEmpty
samuelgruetter Sep 29, 2013
70c14a3
TestScheduler example
samuelgruetter Sep 29, 2013
9e81deb
make all constructors private
samuelgruetter Sep 29, 2013
6635e61
update TODO
samuelgruetter Sep 29, 2013
67449fb
allow to construct Observables ina similar way as futures
samuelgruetter Sep 30, 2013
866d0b3
implicit function conversion hack just for nicer scaladoc
samuelgruetter Oct 1, 2013
06797f8
make defer implementation more explicit
samuelgruetter Oct 1, 2013
373f5b3
apply review patch
samuelgruetter Oct 1, 2013
2654f60
head, headOrElse, zip(3), zip(4)
samuelgruetter Oct 1, 2013
df0436c
changes from review + scaladoc improvements
samuelgruetter Oct 1, 2013
b90634d
improve scaladoc
samuelgruetter Oct 2, 2013
01f5464
rename fold to foldLeft
samuelgruetter Oct 2, 2013
f3734bf
remove takeWhileWithIndex
samuelgruetter Oct 2, 2013
c66b2eb
use Tuple instead of Timestamped
samuelgruetter Oct 2, 2013
677f84c
update completeness test
samuelgruetter Oct 2, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions language-adaptors/rxjava-scala-java/README.md

This file was deleted.

32 changes: 0 additions & 32 deletions language-adaptors/rxjava-scala-java/build.gradle

This file was deleted.

9 changes: 8 additions & 1 deletion language-adaptors/rxjava-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blo

Scala code using Rx should only import members from `rx.lang.scala` and below.

Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala).

## Documentation

The API documentation can be found [here](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable).

You can build the API documentation yourself by running `./gradlew scaladoc` in the RxJava root directory.

Then navigate to `RxJava/language-adaptors/rxjava-scala/build/docs/scaladoc/index.html` to display it.


## Binaries
Expand Down
17 changes: 11 additions & 6 deletions language-adaptors/rxjava-scala/TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ TODOs for Scala Adapter

This is a (probably incomplete) list of what still needs to be done in the Scala adaptor:

* mirror complete Java package structure in Scala
* objects for classes with static methods or singletons (e.g. Schedulers, Subscriptions)
* Notification as a case class
* integrating Scala Futures, should there be a common base interface for Futures and Observables?
* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, exists, tails, ...
* Integrating Scala Futures: Should there be a common base interface for Futures and Observables? And if all subscribers of an Observable wrapping a Future unsubscribe, the Future should be cancelled, but Futures do not support cancellation.
* Add methods present in Scala collections library, but not in RxJava, e.g. aggregate à la Scala, collect, tails, ...
* combineLatest with arities > 2
* decide where the MovieLib/MovieLibUsage (use Scala code from Java code) example should live and make sure gradle builds it in the right order
* Implicit schedulers?
* Avoid text duplication in scaladoc using templates, add examples, distinction between use case signature and full signature
* other small TODOs


(Implicit) schedulers for interval: Options:

```scala
def interval(duration: Duration)(implicit scheduler: Scheduler): Observable[Long]
def interval(duration: Duration)(scheduler: Scheduler): Observable[Long]
def interval(scheduler: Scheduler)(duration: Duration): Observable[Long]
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] && def interval(duration: Duration): Observable[Long]
````
29 changes: 29 additions & 0 deletions language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,30 @@ tasks.withType(ScalaCompile) {
}

sourceSets {
main {
scala {
srcDir 'src/main/scala'
}
}
test {
scala {
srcDir 'src/main/scala'
srcDir 'src/test/scala'
srcDir 'src/examples/scala'
srcDir 'src/examples/java'
}
java.srcDirs = []
}
examples {
// It seems that in Gradle, the dependency "compileScala depends on compileJava" is hardcoded,
// or at least not meant to be removed.
// However, compileScala also runs javac at the very end, so we just add the Java sources to
// the scala source set:
scala {
srcDir 'src/examples/scala'
srcDir 'src/examples/java'
}
java.srcDirs = []
}
}

Expand All @@ -34,6 +54,15 @@ tasks.compileScala {
classpath = classpath + (configurations.compile + configurations.provided)
}

tasks.compileExamplesScala {
classpath = classpath + files(compileScala.destinationDir) + (configurations.compile + configurations.provided)
}

// Add RxJava core to Scaladoc input:
// tasks.scaladoc.source(project(':rxjava-core').tasks.getByPath(':rxjava-core:compileJava').source)
// println("-------")
// println(tasks.scaladoc.source.asPath)

task test(overwrite: true, dependsOn: testClasses) << {
ant.taskdef(name: 'scalatest',
classname: 'org.scalatest.tools.ScalaTestAntTask',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import rx.lang.scala._
import scala.concurrent.duration._
import org.junit.{Before, Test, Ignore}
import org.junit.Assert._
import rx.lang.scala.concurrency.NewThreadScheduler
import rx.lang.scala.concurrency.Schedulers
import java.io.IOException

@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
class RxScalaDemo extends JUnitSuite {
Expand Down Expand Up @@ -167,10 +168,10 @@ class RxScalaDemo extends JUnitSuite {

@Test def schedulersExample() {
val o = Observable.interval(100 millis).take(8)
o.observeOn(NewThreadScheduler).subscribe(
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
)
o.observeOn(NewThreadScheduler).subscribe(
o.observeOn(Schedulers.newThread).subscribe(
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
)
waitFor(o)
Expand Down Expand Up @@ -287,7 +288,7 @@ class RxScalaDemo extends JUnitSuite {
// We can't put a general average method into Observable.scala, because Scala's Numeric
// does not have scalar multiplication (we would need to calculate (1.0/numberOfElements)*sum)
def doubleAverage(o: Observable[Double]): Observable[Double] = {
for ((finalSum, finalCount) <- o.fold((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)}))
for ((finalSum, finalCount) <- o.foldLeft((0.0, 0))({case ((sum, count), elem) => (sum+elem, count+1)}))
yield finalSum / finalCount
}

Expand Down Expand Up @@ -321,13 +322,13 @@ class RxScalaDemo extends JUnitSuite {
.toBlockingObservable.foreach(println(_))
}

// source Observables are in a List:
@Test def zipManySeqExample() {
val observables = List(Observable(1, 2), Observable(10, 20), Observable(100, 200))
(for (seq <- Observable.zip(observables)) yield seq.mkString("(", ", ", ")"))
// source Observables are all known:
@Test def zip3Example() {
val o = Observable.zip(Observable(1, 2), Observable(10, 20), Observable(100, 200))
(for ((n1, n2, n3) <- o) yield s"$n1, $n2 and $n3")
.toBlockingObservable.foreach(println(_))
}

// source Observables are in an Observable:
@Test def zipManyObservableExample() {
val observables = Observable(Observable(1, 2), Observable(10, 20), Observable(100, 200))
Expand Down Expand Up @@ -375,6 +376,88 @@ class RxScalaDemo extends JUnitSuite {
assertEquals(Seq(10, 9, 8, 7), Observable(10, 7, 8, 9).toSeq.map(_.sortWith(f)).toBlockingObservable.single)
}

@Test def timestampExample() {
val timestamped = Observable.interval(100 millis).take(3).timestamp.toBlockingObservable
for ((millis, value) <- timestamped if value > 0) {
println(value + " at t = " + millis)
}
}

@Test def materializeExample1() {
def printObservable[T](o: Observable[T]): Unit = {
import Notification._
o.materialize.subscribe(n => n match {
case OnNext(v) => println("Got value " + v)
case OnCompleted() => println("Completed")
case OnError(err) => println("Error: " + err.getMessage)
})
}

val o1 = Observable.interval(100 millis).take(3)
val o2 = Observable(new IOException("Oops"))
printObservable(o1)
waitFor(o1)
printObservable(o2)
waitFor(o2)
}

@Test def materializeExample2() {
import Notification._
Observable(1, 2, 3).materialize.subscribe(n => n match {
case OnNext(v) => println("Got value " + v)
case OnCompleted() => println("Completed")
case OnError(err) => println("Error: " + err.getMessage)
})
}

@Test def elementAtReplacement() {
assertEquals("b", Observable("a", "b", "c").drop(1).first.toBlockingObservable.single)
}

@Test def elementAtOrDefaultReplacement() {
assertEquals("b", Observable("a", "b", "c").drop(1).firstOrElse("!").toBlockingObservable.single)
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
}

@Test def observableLikeFuture1() {
implicit val scheduler = Schedulers.threadPoolForIO
val o1 = observable {
Thread.sleep(1000)
5
}
val o2 = observable {
Thread.sleep(500)
4
}
Thread.sleep(500)
val t1 = System.currentTimeMillis
println((o1 merge o2).first.toBlockingObservable.single)
println(System.currentTimeMillis - t1)
}

@Test def observableLikeFuture2() {
class Friend {}
val session = new Object {
def getFriends: List[Friend] = List(new Friend, new Friend)
}

implicit val scheduler = Schedulers.threadPoolForIO
val o: Observable[List[Friend]] = observable {
session.getFriends
}
o.subscribe(
friendList => println(friendList),
err => println(err.getMessage)
)

Thread.sleep(1500) // or convert to BlockingObservable
}

@Test def takeWhileWithIndexAlternative {
val condition = true
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)
}

def output(s: String): Unit = println(s)

// blocks until obs has completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,52 @@ import java.{ lang => jlang }
import rx.util.functions._

/**
* These function conversions convert between Scala functions and Rx Funcs and Actions.
* Most users RxScala won't need them, but they might be useful if one wants to use
* the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants
* to use a Java library taking/returning Funcs and Actions.
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
* Most RxScala users won't need them, but they might be useful if one wants to use
* the `rx.Observable` directly instead of using `rx.lang.scala.Observable` or if one wants
* to use a Java library taking/returning `Func`s and `Action`s.
*/
object ImplicitFunctionConversions {
import language.implicitConversions

implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
new Func2[rx.Scheduler, T, Subscription] {
def call(s: rx.Scheduler, t: T): Subscription = {
action(s, t)
}
}

implicit def scalaSchedulerToJavaScheduler(s: Scheduler): rx.Scheduler = s.asJava

implicit def javaSchedulerToScalaScheduler(s: rx.Scheduler): Scheduler = Scheduler(s)

implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
new rx.Observable.OnSubscribeFunc[T] {
def onSubscribe(obs: Observer[_ >: T]): Subscription = {
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
f(obs)
}
}

/**
* Converts a by-name parameter to a Rx Func0
*/
implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] =
new Func0[B] {
def call(): B = param
}

/**
* Converts 0-arg function to Rx Action0
*/
implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 =
new Action0 {
def call(): Unit = f()
}

/**
* Converts 1-arg function to Rx Action1
*/
implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] =
new Action1[A] {
def call(a: A): Unit = f(a)
}

/**
* Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean]
*/
implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] =
new Func1[A, jlang.Boolean] {
def call(a: A): jlang.Boolean = f(a).booleanValue
}

/**
* Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean]
*/
implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] =
new Func2[A, B, jlang.Boolean] {
def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue
Expand All @@ -79,34 +75,21 @@ object ImplicitFunctionConversions {
def call(args: java.lang.Object*): R = f(args)
}

/**
* Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2
*/
implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] =
new Func2[A, jlang.Integer, jlang.Boolean] {
def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue
}

/**
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
*/
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
new Func2[A, A, jlang.Integer] {
def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue
}

/**
* This implicit allows Scala code to use any exception type and still work
* with invariant Func1 interface
*/
implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] =
new Func1[Exception, B] {
def call(ex: Exception): B = f(ex.asInstanceOf[A])
}

/**
* The following implicits convert functions of different arities into the Rx equivalents
*/
implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] =
new Func0[A] {
def call(): A = f()
Expand Down
Loading