Dataflow mediator for ReactiveStreams
Library ID dataflow-mediator-reactivestreams
Latest version 1.0.0

Field to Publisher conversion

Now we'll explain how to convert a Field to a Publisher. Why not an Attribute? The reason is simple: as already mentioned, once a Publisher enters an error state it terminates and cannot recover, oppositely to an Attribute where the state can change freely.

For this reason the only possible conversion is from a Field to a Publisher (that will never terminate with an error).

The function to do this is an extension of Field and is called toPublisher():

import com.femastudios.dataflow.async.* import com.femastudios.dataflow.async.util.* import com.femastudios.dataflow.* import com.femastudios.dataflow.extensions.* import com.femastudios.dataflow.listen.* import com.femastudios.dataflow.util.* import com.femastudios.dataflow.mediator.reactivestreams.* import io.reactivex.Flowable import org.reactivestreams.Publisher fun main() { val fld : MutableField<Int> = mutableFieldOf(0) //Create a field val publisher : Publisher<Int> = fld.toPublisher() //Converts the Field to a Publisher Flowable .fromPublisher(publisher) //Example for ReactiveX .subscribe { println(it) } fld.increment() fld.increment() fld.increment() }
MutableField<Integer> attribute = MutableField.of(0); //Create a field Attribute<Integer> attribute = DataflowReactorMediation.toPublisher(attribute); //Converts the Field to a Publisher Flowable .fromPublisher(publisher) //Example for ReactiveX .subscribe(value -> { System.out.println(value); }); fld.setValue(1); fld.setValue(2); fld.setValue(3);

When the value of the Field changes and there is backpressure, the new value is emitted.

When there is not backpressure no changes except the last one are emitted. The latest value is emitted the next time a subscriber requests data from the Publisher.