Available pipeline (Transformation and Action) functions in PrivacyStreams
Transformations
Input,Output | API & Description |
---|---|
PStream,PStream |
filter(Function<Item,Boolean> itemChecker) Filter the stream by testing an item with a function. Specifically, keep the items that satisfy the function (aka. the function returns true). Eg. filter(eq("x", 100)) will keep the items whose x field is equal to 100.- itemChecker : the function to check each item. |
PStream,PStream |
filter(String fieldName, TValue fieldValue) Filter the stream by checking whether a field equals a value. Specifically, keep the items in which the field equals the given value. Eg. filter("x", 100) will keep the items whose x field is equal to 100.- fieldName : the name of field to check- fieldValue : the value to compare with the field |
PStream,PStream |
groupBy(String groupField) Group the items according to a field. After grouping, the items in the new stream will only have two fields. One is the field used for grouping by. Another is “grouped_items” which is a list of grouped Items. Eg. groupBy("x") will group the items with same “x” field, and the item in the stream after groupping will contain two fields: “x” and “grouped_items”.- groupField : the field used to group the items in current stream.This transformation will change the order of items. |
PStream,PStream |
inFixedInterval(long fixedInterval) Make the items be sent in a fixed interval. Eg. If a stream has items sent at 1ms, 3ms, 7ms, 11ms and 40ms, inFixedInterval(10) will send items at 7ms, 11ms, 11ms and 40ms, in a 10ms interval.- fixedInterval : the fixed interval in milliseconds. |
PStream,PStream |
keepChanges() Only keep the items that are different from the previous ones in the stream. Eg. a stream [1, 1, 2, 2, 2, 1, 1] will be [1, 2, 1] after keepChanges() |
PStream,PStream |
keepChanges(String fieldName) Only Keep the items whose fields are different from the previous ones in the stream. Similar to keepChanges() , but only monitor a certain field- fieldName : the name of field to check whether an item should be kept |
PStream,PStream |
limit(Function<Item,Boolean> itemChecker) Limit the stream by checking each item with a function. Specifically, keep the stream as long as the checker holds (aka. the checker function returns true). Eg. limit(eq("x", 100)) will keep all items in the stream as long as x field equals to 100, once an item’s x value is not equal to 100, the stream stops.- itemChecker : the function to check each item. |
PStream,PStream |
limit(int maxCount) Limit the stream with a max number of items. Specifically, stop the stream if the count of items exceeds the threshold. Eg. limit(10) will limit the stream to at most 10 items- maxCount : the max number of items |
PStream,PStream |
localGroupBy(String groupField) Group the contiguous items according to a field. After grouping, the items in the new stream will only have two fields. One is the field used for grouping by. Another is “grouped_items” which is a list of grouped Items. Eg. localGroupBy("x") will group the contiguous items with same “x” field, and the item in the stream after groupping will contain two fields: “x” and “grouped_items”.- groupField : the field used to group the items in current stream. |
PStream,PStream |
logAs(String logTag) Print the items in current stream. - logTag : the log tag to use in printing current stream |
PStream,PStream |
logOverSocket(String logTag) Print the items in current stream over socket. - logTag : the log tag to use in printing current stream |
PStream,PStream |
map(Function<Item,Item> itemConverter) Convert each item in the stream with a function. Eg. map(ItemOperators.setField("x", 10)) will set the “x” field of each item to 10 in the stream.- itemConverter : the function to map each item to another item |
PStream,PStream |
project(String fieldsToInclude) Project each item by including some fields. Other fields will be removed. Eg. project("name", "email") will only keep the “name” and “email” field in each item.- fieldsToInclude : the fields to include |
PStream,PStream |
reuse(int numOfReuses) Reuse current stream. - numOfReuses : number of reuses |
PStream,PStream |
reverse() Reverse the order of items reverse() will reverse the order of the items in the stream.This transformation will change the order of items. |
PStream,PStream |
sampleByCount(int stepCount) Sample the items based on a given step count. The items are filtered to make sure stepCount number of items are dropped between each two new items. Eg. sampleByCount(2) will keep the 1st, 4th, 7th, 10th, … items- stepCount : the num of items to drop since last item |
PStream,PStream |
sampleByInterval(long minInterval) Sample the items based on a given interval. The items sent within the time interval since last item are dropped. Eg. If a stream has items sent at 1ms, 3ms, 7ms, 11ms and 40ms, sampleByInterval(10) will only keep the items sent at 1ms, 11ms and 40ms.- minInterval : the minimum interval (in milliseconds) between each two items. |
PStream,PStream |
setField(String fieldToSet, Function<Item,TValue> fieldValueComputer) Set a field to a new value for each item in the stream. The value is computed with a function that take the item as input. Eg. setField("x", Comparators.gt("y", 10)) will set a new boolean field “x” to each item, which indicates whether the “y” field is greater than 10.- fieldToSet : the name of the field to set.- fieldValueComputer : the function to compute the value of the new field based on each item.- <TValue> : the type of the new field value |
PStream,PStream |
setGroupField(String fieldToSet, Function<List<Item>,TValue> fieldValueComputer) Set a field to a new value for each item in the stream. This transformation can only be used after invoking group methods ( groupBy , localGroupBy ). The value is computed with a function that takes the grouped items as input at runtime. Eg. setGroupField("count", StatisticOperators.count()) will set a new field “count” to each item, which represents the number of items in the grouped sub stream.- fieldToSet : the name of the field to set.- fieldValueComputer : the function to compute the new field value, which takes the list of grouped items as input.- <TValue> : the type of the new field value |
PStream,PStream |
setIndependentField(String fieldToSet, Function<Void,TValue> valueGenerator) Set the value of a new field with a value generator function. The value generator function is independent from current item, which does not need a input (input type is Void). The value generator will be evaluated on demand at runtime. Eg. setIndependentField("time", TimeOperators.getCurrentTime()) will set the field “time” to a timestamp in each item; setIndependentField("wifiStatus", DeviceOperators.isWifiConnected()) will set the field “wifiStatus” to a boolean indicating whether wifi is connected in each item.- fieldToSet : the name of the field to set.- valueGenerator : the function to compute the field value.- <TValue> : the type of the new field value. |
PStream,PStream |
shuffle() Shuffle the items. shuffle() will randomize the order of the items in the stream.This transformation will change the order of items. |
PStream,PStream |
sortBy(String fieldName) Sort the items according to the value of a field, in ascending order. Eg. sortBy("timestamp") will sort the items in the stream by timestamp field.- fieldName : the field used to sort the items in current stream, in ascending orderThis transformation will change the order of items. |
PStream,PStream |
timeout(long timeoutMilliseconds) Limit the stream with a timeout, stop the stream after time out. Eg. timeout(Duration.seconds(10)) will limit the stream to at most 10 seconds- timeoutMilliseconds : the timeout milliseconds |
PStream,PStream |
unGroup(String unGroupField, String newField) Un-group a list field in each item to multiple items. Each element in the list will be a new field in each item of the new stream. After un-grouping, the items in the new streams will have the same amount of fields as the original stream. However, the list field ( unGroupField ) will be replaced by a new field (newField ). Eg. unGroup("emails", "email") will un-group the “emails” field (which is a list) in an item to several new items with a “email” field.- unGroupField : the field to un-group, whose value should be a list- newField : the new field name in the new stream |
Actions
Input,Output | API & Description |
---|---|
PStream,Item |
getFirst() Get the first item in the stream. This action will block current thread. |
PStream,Item |
getItemAt(int index) Pick the N-th item in the stream. N is the index. - index : the index of target item.This action will block current thread. |
PStream,List<Item> |
asList() Collect the items in the stream to a list. Each item in the list is an instance of Item .This action will block current thread. |
PStream,List<TValue> |
asList(String fieldToSelect) Select a field in each item and output the field values to a list. - fieldToSelect : the field to select- <TValue> : the type of field valueThis action will block current thread. |
PStream,TValue |
getFieldAt(String fieldName, int index) Get the N-th value of a given field. N is the index. - fieldName : the name of the field to select- index : the index of target item.This action will block current thread. |
PStream,TValue |
getFirst(String fieldName) Get the first value of the given field in the stream. This action will block current thread. |
PStream,Tout |
output(Function<List<Item>,Tout> itemsCollector) Output the items in the stream with a function. This method will block until the result returns. Eg. output(StatisticOperators.count()) will output the number of items.- itemsCollector : the function used to output current stream- <Tout> : the type of the resultThis action will block current thread. |
PStream,int |
count() Count the number of items. This action will block current thread. |
PStream,void |
debug() Print the items for debugging. This action will NOT block current thread. |
PStream,void |
forEach(Function<Item,Void> callback) Callback with each item. - callback : the callback to invoke for each item.This action will NOT block current thread. |
PStream,void |
forEach(String fieldToSelect, Function<TValue,Void> callback) Callback with a certain field of each item. - fieldToSelect : the name of the field to callback with- callback : the callback to invoke for each item field- <TValue> : the type of the fieldThis action will NOT block current thread. |
PStream,void |
idle() Do nothing with the items. This action will NOT block current thread. |
PStream,void |
ifPresent(Function<Item,Void> callback) Callback with an item once one item is present. - callback : the callback to invoke once the item is presentThis action will NOT block current thread. |
PStream,void |
ifPresent(String fieldToSelect, Function<TValue,Void> callback) Callback with a field value of an item once the field value is present. - fieldToSelect : the name of the field to callback with- callback : the callback to invoke once the field value is present- <TValue> : the type of the fieldThis action will NOT block current thread. |
PStream,void |
onChange(Function<Item,Void> callback) Callback with an item when the item changes (is different from the previous item). - callback : the callback to invoke for the changed itemThis action will NOT block current thread. |
PStream,void |
onChange(String fieldToSelect, Function<TValue,Void> callback) Callback with a field value of an item when the field value changes. - fieldToSelect : the name of the field to callback with- callback : the callback to invoke for the changed item field- <TValue> : the type of the fieldThis action will NOT block current thread. |
PStream,void |
output(Function<List<Item>,Tout> resultComputer, Callback<Tout> resultHandler) Output the items in the stream with a function, and pass the result to a callback. This method will not be blocked. Eg. outputItems(StatisticOperators.count(), new Callback<Integer>(){...}) will count the number of items and callback with the number.- resultComputer : the function used to compute result based on the items in current stream- resultHandler : the function to handle the result- <Tout> : the type of the resultThis action will NOT block current thread. |