Output as a function
Sometimes you may want to process the output in a way which is not provided by the framework, e.g. custom file format, or remote network endpoint, or just store it in memory. For that purpose you may implement the output as a function. It is as easy as call .out() on the stream of any type:
stream.out { /* handle the samples your own way */ }
The argument of the function is of type io.wavebeans.lib.io.WriteFunctionArgument, which has the following fields:
sampleClazz(KClass<T>) – the class of the sample for convenience.sampleIndex(Long) – the 0-based global index of the sample.sampleRate(Float) – the sample rate the output is being evaluated with.sample(T?) – the nullable depending on the phasephasesample value.phase(WriteFunctionPhase) – the phase of the writing routine:WRITE,END,CLOSE. Phase describes where is the writer currently is:WRITEtells that the writer is currently getting the input signal, and expect it to process. Thesamplefield is nevernullin this case.ENDtells that the writer has reached the end of the input stream, but the writer has been called. May not be called in some cases (i.e. the writer’s write function is stopped calling before the writer hit on the end of the stream, or the stream is endless), or be called more than once (in case that the writer’s write function is called after the previous call returnedfalse), but during regular execution is being called only once. Thesamplefield isnullin this case.CLOSEtells that the writer is being closed. Thesamplefield isnullin this case.
The function expects to return the value of Boolean type, that controls the output writer behavior:
- In the
WRITEphase if the function returnstruethe writer will continue processing the input, if it returnsfalsethe writer will stop processing, but anywayCLOSEphase will be initiated. - It doesn’t affect anything in other phases.
Here is some example writing into a shared memory storage, it writes the 1 second of 440Hz sine:
/**
* It's not a proper storage, just to provide an idea.
* It is an object to be able to use function as lambda.
*/
object Storage {
private val list = ArrayList<Sample>()
fun add(sample: Sample) { list += sample }
fun list(): List<Sample> = list
}
440.sine() // the stream is infinite, but we'll limit it in the output function
.out {
// write only samples within WRITE phase
if (it.phase == WRITE) Storage.add(it.sample!!)
// limit with one second of data
it.sampleIndex / it.sampleRate < 1.0f
}
Running in multi-threaded or distributed mode: by default outputs are evaluated as a single bean and are not parallelized, the function as an output is not exception. That means it is safe to say the output function may have some state in it, though it is not guaranteed that it will be launched in the very same thread every time. One more thing, if the stream is evaluated sequentially a few times in a row within the same process routine, the function is created only once, so the state should take this into account.