Available pipeline (Transformation and Action) functions in PrivacyStreams

Transformations

Input,Output API & Description
PStream,PStream filter(Function<Item,Boolean> itemChecker)
Filter the stream by testing an item with a function. Specifically, keep the items that satisfy the function (aka. the function returns true). Eg. filter(eq("x", 100)) will keep the items whose x field is equal to 100.
- itemChecker: the function to check each item.
PStream,PStream filter(String fieldName, TValue fieldValue)
Filter the stream by checking whether a field equals a value. Specifically, keep the items in which the field equals the given value. Eg. filter("x", 100) will keep the items whose x field is equal to 100.
- fieldName: the name of field to check
- fieldValue: the value to compare with the field
PStream,PStream groupBy(String groupField)
Group the items according to a field. After grouping, the items in the new stream will only have two fields. One is the field used for grouping by. Another is “grouped_items” which is a list of grouped Items. Eg. groupBy("x") will group the items with same “x” field, and the item in the stream after groupping will contain two fields: “x” and “grouped_items”.
- groupField: the field used to group the items in current stream.
This transformation will change the order of items.
PStream,PStream inFixedInterval(long fixedInterval)
Make the items be sent in a fixed interval. Eg. If a stream has items sent at 1ms, 3ms, 7ms, 11ms and 40ms, inFixedInterval(10) will send items at 7ms, 11ms, 11ms and 40ms, in a 10ms interval.
- fixedInterval: the fixed interval in milliseconds.
PStream,PStream keepChanges()
Only keep the items that are different from the previous ones in the stream. Eg. a stream [1, 1, 2, 2, 2, 1, 1] will be [1, 2, 1] after keepChanges()
PStream,PStream keepChanges(String fieldName)
Only Keep the items whose fields are different from the previous ones in the stream. Similar to keepChanges(), but only monitor a certain field
- fieldName: the name of field to check whether an item should be kept
PStream,PStream limit(Function<Item,Boolean> itemChecker)
Limit the stream by checking each item with a function. Specifically, keep the stream as long as the checker holds (aka. the checker function returns true). Eg. limit(eq("x", 100)) will keep all items in the stream as long as x field equals to 100, once an item’s x value is not equal to 100, the stream stops.
- itemChecker: the function to check each item.
PStream,PStream limit(int maxCount)
Limit the stream with a max number of items. Specifically, stop the stream if the count of items exceeds the threshold. Eg. limit(10) will limit the stream to at most 10 items
- maxCount: the max number of items
PStream,PStream localGroupBy(String groupField)
Group the contiguous items according to a field. After grouping, the items in the new stream will only have two fields. One is the field used for grouping by. Another is “grouped_items” which is a list of grouped Items. Eg. localGroupBy("x") will group the contiguous items with same “x” field, and the item in the stream after groupping will contain two fields: “x” and “grouped_items”.
- groupField: the field used to group the items in current stream.
PStream,PStream logAs(String logTag)
Print the items in current stream.
- logTag: the log tag to use in printing current stream
PStream,PStream logOverSocket(String logTag)
Print the items in current stream over socket.
- logTag: the log tag to use in printing current stream
PStream,PStream map(Function<Item,Item> itemConverter)
Convert each item in the stream with a function. Eg. map(ItemOperators.setField("x", 10)) will set the “x” field of each item to 10 in the stream.
- itemConverter: the function to map each item to another item
PStream,PStream project(String fieldsToInclude)
Project each item by including some fields. Other fields will be removed. Eg. project("name", "email") will only keep the “name” and “email” field in each item.
- fieldsToInclude: the fields to include
PStream,PStream reuse(int numOfReuses)
Reuse current stream.
- numOfReuses: number of reuses
PStream,PStream reverse()
Reverse the order of items reverse() will reverse the order of the items in the stream.
This transformation will change the order of items.
PStream,PStream sampleByCount(int stepCount)
Sample the items based on a given step count. The items are filtered to make sure stepCount number of items are dropped between each two new items. Eg. sampleByCount(2) will keep the 1st, 4th, 7th, 10th, … items
- stepCount: the num of items to drop since last item
PStream,PStream sampleByInterval(long minInterval)
Sample the items based on a given interval. The items sent within the time interval since last item are dropped. Eg. If a stream has items sent at 1ms, 3ms, 7ms, 11ms and 40ms, sampleByInterval(10) will only keep the items sent at 1ms, 11ms and 40ms.
- minInterval: the minimum interval (in milliseconds) between each two items.
PStream,PStream setField(String fieldToSet, Function<Item,TValue> fieldValueComputer)
Set a field to a new value for each item in the stream. The value is computed with a function that take the item as input. Eg. setField("x", Comparators.gt("y", 10)) will set a new boolean field “x” to each item, which indicates whether the “y” field is greater than 10.
- fieldToSet: the name of the field to set.
- fieldValueComputer: the function to compute the value of the new field based on each item.
- <TValue>: the type of the new field value
PStream,PStream setGroupField(String fieldToSet, Function<List<Item>,TValue> fieldValueComputer)
Set a field to a new value for each item in the stream. This transformation can only be used after invoking group methods (groupBy, localGroupBy). The value is computed with a function that takes the grouped items as input at runtime. Eg. setGroupField("count", StatisticOperators.count()) will set a new field “count” to each item, which represents the number of items in the grouped sub stream.
- fieldToSet: the name of the field to set.
- fieldValueComputer: the function to compute the new field value, which takes the list of grouped items as input.
- <TValue>: the type of the new field value
PStream,PStream setIndependentField(String fieldToSet, Function<Void,TValue> valueGenerator)
Set the value of a new field with a value generator function. The value generator function is independent from current item, which does not need a input (input type is Void). The value generator will be evaluated on demand at runtime. Eg. setIndependentField("time", TimeOperators.getCurrentTime()) will set the field “time” to a timestamp in each item; setIndependentField("wifiStatus", DeviceOperators.isWifiConnected()) will set the field “wifiStatus” to a boolean indicating whether wifi is connected in each item.
- fieldToSet: the name of the field to set.
- valueGenerator: the function to compute the field value.
- <TValue>: the type of the new field value.
PStream,PStream shuffle()
Shuffle the items. shuffle() will randomize the order of the items in the stream.
This transformation will change the order of items.
PStream,PStream sortBy(String fieldName)
Sort the items according to the value of a field, in ascending order. Eg. sortBy("timestamp") will sort the items in the stream by timestamp field.
- fieldName: the field used to sort the items in current stream, in ascending order
This transformation will change the order of items.
PStream,PStream timeout(long timeoutMilliseconds)
Limit the stream with a timeout, stop the stream after time out. Eg. timeout(Duration.seconds(10)) will limit the stream to at most 10 seconds
- timeoutMilliseconds: the timeout milliseconds
PStream,PStream unGroup(String unGroupField, String newField)
Un-group a list field in each item to multiple items. Each element in the list will be a new field in each item of the new stream. After un-grouping, the items in the new streams will have the same amount of fields as the original stream. However, the list field (unGroupField) will be replaced by a new field (newField). Eg. unGroup("emails", "email") will un-group the “emails” field (which is a list) in an item to several new items with a “email” field.
- unGroupField: the field to un-group, whose value should be a list
- newField: the new field name in the new stream

Actions

Input,Output API & Description
PStream,Item getFirst()
Get the first item in the stream.
This action will block current thread.
PStream,Item getItemAt(int index)
Pick the N-th item in the stream. N is the index.
- index: the index of target item.
This action will block current thread.
PStream,List<Item> asList()
Collect the items in the stream to a list. Each item in the list is an instance of Item.
This action will block current thread.
PStream,List<TValue> asList(String fieldToSelect)
Select a field in each item and output the field values to a list.
- fieldToSelect: the field to select
- <TValue>: the type of field value
This action will block current thread.
PStream,TValue getFieldAt(String fieldName, int index)
Get the N-th value of a given field. N is the index.
- fieldName: the name of the field to select
- index: the index of target item.
This action will block current thread.
PStream,TValue getFirst(String fieldName)
Get the first value of the given field in the stream.
This action will block current thread.
PStream,Tout output(Function<List<Item>,Tout> itemsCollector)
Output the items in the stream with a function. This method will block until the result returns. Eg. output(StatisticOperators.count()) will output the number of items.
- itemsCollector: the function used to output current stream
- <Tout>: the type of the result
This action will block current thread.
PStream,int count()
Count the number of items.
This action will block current thread.
PStream,void debug()
Print the items for debugging.
This action will NOT block current thread.
PStream,void forEach(Function<Item,Void> callback)
Callback with each item.
- callback: the callback to invoke for each item.
This action will NOT block current thread.
PStream,void forEach(String fieldToSelect, Function<TValue,Void> callback)
Callback with a certain field of each item.
- fieldToSelect: the name of the field to callback with
- callback: the callback to invoke for each item field
- <TValue>: the type of the field
This action will NOT block current thread.
PStream,void idle()
Do nothing with the items.
This action will NOT block current thread.
PStream,void ifPresent(Function<Item,Void> callback)
Callback with an item once one item is present.
- callback: the callback to invoke once the item is present
This action will NOT block current thread.
PStream,void ifPresent(String fieldToSelect, Function<TValue,Void> callback)
Callback with a field value of an item once the field value is present.
- fieldToSelect: the name of the field to callback with
- callback: the callback to invoke once the field value is present
- <TValue>: the type of the field
This action will NOT block current thread.
PStream,void onChange(Function<Item,Void> callback)
Callback with an item when the item changes (is different from the previous item).
- callback: the callback to invoke for the changed item
This action will NOT block current thread.
PStream,void onChange(String fieldToSelect, Function<TValue,Void> callback)
Callback with a field value of an item when the field value changes.
- fieldToSelect: the name of the field to callback with
- callback: the callback to invoke for the changed item field
- <TValue>: the type of the field
This action will NOT block current thread.
PStream,void output(Function<List<Item>,Tout> resultComputer, Callback<Tout> resultHandler)
Output the items in the stream with a function, and pass the result to a callback. This method will not be blocked. Eg. outputItems(StatisticOperators.count(), new Callback<Integer>(){...}) will count the number of items and callback with the number.
- resultComputer: the function used to compute result based on the items in current stream
- resultHandler: the function to handle the result
- <Tout>: the type of the result
This action will NOT block current thread.