Link Search Menu Expand Document

Flatten operation

Table of Contents

Overview

Flatten operation allows you to convert the stream of iterable elements, like BeanStream<List<T>>, to a stream of the elements themselves BeanStream<T>. It achieves with iterating over the elements in the container and once the container is empty the next one is being read from the stream.

For example, let’s consider the stream of list of integers, and flatten it:

{ [1, 2, 3], [4, 5, 6], [7, 8, 9], ... } ==(flatten)==> { 1, 2, 3, 4, 5, 6, 7, 8, 9, ... }

To perform a flatten operation just call the .flatten() method on the stream. The operation is very useful after the initial stream was windowed, perhaps processed in such batches and the output should be generated as a regualr non-batch stream. The flatten method is available for any stream which elements implement the Iterable<T> interface.

440.sine() // BeanStream<Sample>
    .window(32) // BeanStream<Window<Sample>>
    .map { window ->
        // do something within a batch 
        val a = window.elements.map { it.asDouble() }.average()
        (0 until window.size).map { sampleOf(a) }
    } // BeanStream<List<Sample>>
    .flatten() // BeanStream<Sample>

Flatten operation supports the lists or collections of any sizes, though if it there is an iterable of infinite size it’ll never get to the next element as well as the stream will never end by itself. Also, if it hits on the empty list it’ll be simply skipped.

{ [1, 2], [3], [], [4, 5], [6, 7, 8, 9], [], ... } ==(flatten)==> { 1, 2, 3, 4, 5, 6, 7, 8, 9, ... }

Flattening is performed only on one level, so if the stream is a list of lists or similar, you would need to perform the flattening for each level explicitly:

{ [[1, 2, 3], [4, 5, 6]], [[7, 8, 9]], ... }
    ==(flatten)==> { [1, 2, 3], [4, 5, 6], [7, 8, 9], ... }
    ==(flatten)==> { 1, 2, 3, 4, 5, 6, 7, 8, 9, ... }

Using for the SampleVector

SampleVector, which is technically is just an array of samples, can be flattened as well. Though some operations (i.e. table or wav outputs) support working with it as with usual sample keeping it slightly more optimized than a singular sample, that’s why it better to use such operations instead of flattening.

If that is not an option, just call flatten() method on the stream:

input { (i, _) -> sampleVectorOf(
         sampleOf(sin(i)), 
         sampleOf(cos(i)), 
         sampleOf(sin(i) * cos(i))
    ) } // BeanStream<SampleVector>
    .flatten() // BeanStream<Sample>

The Window is quite different

Window is also some sort of container if iteraable elements, and there is an API to support flattening out of the box using the very same flatten() method.

440.sine() // BeanStream<Sample>
    .window(32) // BeanStream<Window<Sample>>
    .flatten()  // BeanStream<Sample>

Though there is one attribute of window which makes it stand out, and this is a step. Step can make windows overlap between each other, and while flattening you need to resolve that. For that purpose you may specify the overlapResolve function that gets the pair of elements to somehow get a overlapped one, you

val input = input { (i, _) -> i} // BeanStream<Long>
            .window(64, 32) // BeanStream<Window<Long>>

// resolve as a sum of overlapping elements
input.flatten { (a, b) -> a + b } // BeanStream<Long>

// resolve as an average of overlapping elements
input.flatten { (a, b) -> (a + b) / 2L } // BeanStream<Long>

// resolve by taking only the first element
input.flatten { (a, _) -> a } // BeanStream<Long>

The need for the function is checked only in runtime, so if you’re sure it won’t be called you may not specify it. As a general rule specify it if step < size, if step == size || step > size you may omit it.

As a remark, if step > size you’ll see the zero elements of the window in the flattened stream.

Non-iterable types

You may call the flatten() method on each stream which element extends the Iterable<T> interface, but sometimes it is convenient to extract the iterable out of another type, at the same time the map() seems superfluous. For that purpose you may use the flatMap() method specifying the function to provide mapping to iterable entity. It’ll do the mapping and flatten operation at the same time:

input { (i, _) -> Pair(i, i * 2)} // BeanStream<Pair<Long>>
    .flatMap { listOf(it.first, it.second).map { sampleOf(it) }  } // BeanStream<Sample>

// or similar by functionality but not by execution details
input { (i, _) -> Pair(i, i * 2)} // BeanStream<Pair<Long>>
    .map { listOf(it.first, it.second).map { sampleOf(it) }  } // BeanStream<List<Sample>>
    .flatten() // BeanStream<Sample>

The main difference between using map+flatten and flatMap, that in distributed/multi-threaded execution the map+flatten is treated as two separate operations, which requires making sure the object between map and flatten is serializable, and in some case will be actually transferred over the network. Also, at the moment, the flatten is non-parallelized operation, but map is. That all may have some performance impact.