Field to Publisher conversion
Now we'll explain how to convert a Field
to a Publisher
. Why not an Attribute
? The reason is simple: as already mentioned, once a Publisher
enters an error state it terminates and cannot recover, oppositely to an Attribute
where the state can change freely.
For this reason the only possible conversion is from a Field
to a Publisher
(that will never terminate with an error).
The function to do this is an extension of Field
and is called toPublisher()
:
import com.femastudios.dataflow.async.*
import com.femastudios.dataflow.async.util.*
import com.femastudios.dataflow.*
import com.femastudios.dataflow.extensions.*
import com.femastudios.dataflow.listen.*
import com.femastudios.dataflow.util.*
import com.femastudios.dataflow.mediator.reactivestreams.*
import io.reactivex.Flowable
import org.reactivestreams.Publisher
fun main() {
val fld : MutableField<Int> = mutableFieldOf(0) //Create a field
val publisher : Publisher<Int> = fld.toPublisher() //Converts the Field to a Publisher
Flowable
.fromPublisher(publisher) //Example for ReactiveX
.subscribe {
println(it)
}
fld.increment()
fld.increment()
fld.increment()
}
MutableField<Integer> attribute = MutableField.of(0); //Create a field
Attribute<Integer> attribute = DataflowReactorMediation.toPublisher(attribute); //Converts the Field to a Publisher
Flowable
.fromPublisher(publisher) //Example for ReactiveX
.subscribe(value -> {
System.out.println(value);
});
fld.setValue(1);
fld.setValue(2);
fld.setValue(3);
When the value of the Field
changes and there is backpressure, the new value is emitted.
When there is not backpressure no changes except the last one are emitted. The latest value is emitted the next time a subscriber requests data from the Publisher
.