Skip to content

Commit

Permalink
Merge pull request #8 from andodet/dev
Browse files Browse the repository at this point in the history
0.0.2 CRAN release
  • Loading branch information
andodet committed Nov 26, 2021
2 parents 4fa25a0 + 693a03f commit d14dda8
Show file tree
Hide file tree
Showing 29 changed files with 177 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help
on:
push:
branches: [main, master]
branches: [main, master, dev]
pull_request:
branches: [main, master]
branches: [main, master, dev]

name: R-CMD-check-ascran

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/testthat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Need help debugging build failures? Start at https://github.com/r-lib/actions#where-to-find-help
on:
push:
branches: [main, master]
branches: [main, master, dev]
pull_request:
branches: [main, master]
branches: [main, master, dev]

name: testthat

Expand Down
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: googlePubsubR
Title: R Interface for Google 'Cloud Pub/Sub' REST API
Version: 0.0.1
Version: 0.0.2
Authors@R:
person(given = "Andrea",
family = "Dodet",
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export(SchemaSettings)
export(Snapshot)
export(Subscription)
export(Topic)
export(msg_decode)
export(msg_encode)
export(pubsub_auth)
export(schemas_create)
export(schemas_delete)
Expand Down
7 changes: 6 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# googlePubsubR 0.0.2

* Add helpers to encode/decode messages: `msg_encode`, `msg_decode`.
* `pubsub_auth` now prompts the correct package name (fixes #6)

# googlePubsubR 0.0.1

* Initial version
* Initial version
2 changes: 1 addition & 1 deletion R/auth.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pubsub_auth <- function(json_file = Sys.getenv("GCP_AUTH_FILE"),
if(is.null(json_file)){
gar_auth(token = token,
email = email,
package = "googleCloudStorageR")
package = "googlePubsubR")
} else {
gar_auth_service(json_file = json_file)
}
Expand Down
7 changes: 4 additions & 3 deletions R/objects.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#' Builds a PubsubMessage Object
#'
#' @param data `character` The message data field as a base64 encoded string
#' @param message_id `character ID of this message, assigned by the server when the message
#' @param message_id `character` ID of this message, assigned by the server when the message
#' is published
#' @param ordering_key `character` If non-empty, identifies related messages for which publish
#' order should be respected
Expand Down Expand Up @@ -60,11 +60,12 @@ Topic <- function(labels = NULL, name = NULL, kms_key_name = NULL, satisfies_pzs
#' Builds a DqlPolicy object
#'
#' @param dlq_topic `character`, `Topic` Required, topic name or instance of a topic object
#' @param max_delivery_attempts `numeric`
#' @param max_delivery_attempts `numeric` Number of delivery attempts for any message. The
#' value must be between 5 and 100.
#'
#' @return `DlqPolicy` object
#' @export
#'
#'
#' @family Object functions
DlqPolicy <- function(dlq_topic, max_delivery_attempts) {
dlq_topic <- as.topic_name(dlq_topic)
Expand Down
6 changes: 3 additions & 3 deletions R/schemas.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ schemas_validate <- function(schema, project = Sys.getenv("GCP_PROJECT")) {
#' this is a continuation of a prior `ListSchemas` call, and that the system should return
#' the next page of data
#'
#' @return A `data.frame`
#' @return A `data.frame` containing all schema objects and properties
#'
#' @importFrom googleAuthR gar_api_generator
#' @family Schema functions
Expand All @@ -122,7 +122,7 @@ schemas_list <- function(project = Sys.getenv("GCP_PROJECT"), pageSize = NULL,
#'
#' @param schema `character`, `Schema` Required, schema name or an instance of a `Schema` object
#'
#' @return `logical`
#' @return `logical` TRUE if the schema exists
#' @family Schema functions
#' @export
schemas_exists <- function(schema) {
Expand Down Expand Up @@ -188,7 +188,7 @@ schemas_delete <- function(name) {
#' @param encoding `character` The encoding of the message
#' @param project `character` A GCP project id
#'
#' @return `logical`
#' @return `logical` TRUE if successfully validated
#'
#' @importFrom googleAuthR gar_api_generator
#' @family Schema functions
Expand Down
3 changes: 2 additions & 1 deletion R/snapshots.R
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ snapshots_list <- function(project = Sys.getenv("GCP_PROJECT"), pageSize = NULL,
#'
#' @param snapshot `character`, `Snapshot` Required, snapshot name or an instance of a `Snapshot` object
#'
#' @return `logical`
#' @return `logical` TRUE if snapshot exists
#'
#' @family Snapshot functions
#' @export
snapshots_exists <- function(snapshot) {
Expand Down
16 changes: 9 additions & 7 deletions R/subscriptions.R
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ subscriptions_detach <- function(subscription) {
#' @param pageToken `character` The value returned by the last `subscriptions_list`;
#' indicates that this is a continuation of a prior `subscriptions_list` call
#'
#' @return `list`
#' @return `list` A list containing all subscriptions
#'
#' @importFrom googleAuthR gar_api_generator
#' @family Subscription functions
Expand Down Expand Up @@ -228,7 +228,7 @@ subscriptions_pull <- function(subscription, max_messages = 100) {
#' @param subscription `character`, `Subscription` Required, the subscription whose messages
#' are being acknowledged
#'
#' @return `logical`
#' @return `logical` TRUE if message(s) was successfully acknowledged
#' @importFrom googleAuthR gar_api_generator
#' @family Subscription functions
#' @export
Expand All @@ -249,7 +249,7 @@ subscriptions_ack <- function(ack_ids, subscription) {
#' @param subscription `character`, `Subscription` Required, subscription name or instance of
#' a `Subscription` object
#'
#' @return `logical`
#' @return `logical` TRUE if the subscription exist
#' @family Subscription functions
#' @export
subscriptions_exists <- function(subscription) {
Expand Down Expand Up @@ -344,7 +344,7 @@ subscriptions_patch <- function(subscription,
#' @param time `character` A timestamp in RFC3339 UTC "Zulu" format
#' @param snapshot `character`, `Snapshot` A Snapshot name or a `Snapshot` object
#'
#' @return `logical`
#' @return `logical` TRUE when succesfull seeked
#'
#' @importFrom googleAuthR gar_api_generator
#' @family Subscription functions
Expand All @@ -368,15 +368,17 @@ subscriptions_seek <- function(subscription, time = NULL, snapshot = NULL) {
}
}

#' Updates an existing subscription
#' Modify the ack deadline for a subscription
#'
#' Certain properties of a subscription, such as its topic, are not modifiable.
#' This method is useful to indicate that more time is needed to process a message by the
#' subscriber, or to make the message available for redelivery if the processing was
#' interrupted.
#'
#' @param subscription `character`, `Subscription` A subscription name or `Subscription` object
#' @param ack_ids `character` A vector containing ackIDs. They can be acquired using
#' @param ack_deadline `numeric` The new ack deadline (in seconds)
#'
#' @return `logical`
#' @return `logical` TRUE if successfully modified
#'
#' @importFrom googleAuthR gar_api_generator
#' @family Subscription functions
Expand Down
45 changes: 45 additions & 0 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,51 @@ secs_to_str <- function(x) {
paste0(x, "s")
}


#' Decode Pub/Sub message
#'
#' Converts a Pub/Sub message into an object
#'
#' @param x A base64 encoded string
#'
#' @examples
#' \dontrun{
#' library(jsonlite)
#'
#' pulled_msgs$receivedMessages$messages$data %>%
#' msg_decode() %>%
#' fromJSON()
#' }
#'
#' @return A deserialized object
#' @export
msg_decode <- function(x) {
x %>%
base64enc::base64decode() %>%
rawToChar()
}

#' Encode Pub/Sub message
#'
#' Converts an object into a base64 string
#'
#' @param x A serializeable object
#'
#' @examples
#' \dontrun{
#' mtcars %>%
#' msg_encode() %>%
#' PubsubMessage()
#' }
#'
#' @return `character` a base64 encoded string
#' @export
msg_encode <- function(x) {
x %>%
charToRaw() %>%
base64enc::base64encode()
}

#' Pipe operator
#'
#' See \code{magrittr::\link[magrittr:pipe]{\%>\%}} for details.
Expand Down
16 changes: 11 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ sub_readme <- subscriptions_create("readme-sub", topic_readme)
# Prepare the message
msg <- mtcars %>%
toJSON(auto_unbox = TRUE) %>%
charToRaw() %>%
# Pub/Sub expects a base64 encoded string
base64encode() %>%
msg_encode() %>%
PubsubMessage()

# Publish the message!
Expand All @@ -65,8 +64,7 @@ topics_publish(msg, topic_readme)
msgs_pull <- subscriptions_pull(sub_readme)

msg_decoded <- msgs_pull$receivedMessages$message$data %>%
base64decode() %>%
rawToChar() %>%
msg_decode() %>%
fromJSON()

head(msg_decoded)
Expand All @@ -80,7 +78,15 @@ head(msg_decoded)
# Hornet Sportabout 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2
# Valiant 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1

#Cleanup resources
# We can acknowledge that the message has been consumed
subscriptions_ack(msgs_pull$receivedMessages$ackId, sub_readme)
# [1] TRUE

# A subsequent pull will return no messages from the server
subscriptions_pull(sub_readme)
# named list()

# Cleanup resources
topics_delete(topic_readme)
subscriptions_delete(sub_readme)
```
Expand Down
4 changes: 4 additions & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ reference:
- title: "Auth"
contents:
- pubsub_auth
- title: "Utils"
contents:
- msg_encode
- msg_decode
13 changes: 2 additions & 11 deletions cran-comments.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,8 @@

This is a resubmission. In this version I have:

* Added `\value` tags as requested to the following functions:
- `pubsub_auth.Rd`
- `pubsub_auth.Rd`
- `schemas_delete.Rd`
- `schemas_validate.Rd`
- `snapshots_delete.Rd`
- `subscriptions_delete.Rd`
- `subscriptions_detach.Rd`
- `topics_delete.Rd`

* Enclosed 'Cloud Pub/Sub' in single quotes in title and description
* Add helpers to encode/decode messages: `msg_encode`, `msg_decode`.
* `pubsub_auth` now prompts the correct package name (fixes #6)

## Test environments

Expand Down
13 changes: 5 additions & 8 deletions inst/shiny/consumer_example/app.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ gen_msg <- function() {
) %>%
as.list() %>%
toJSON(auto_unbox = TRUE) %>%
charToRaw() %>%
base64encode() %>%
msg_encode() %>%
PubsubMessage()
}

Expand All @@ -36,8 +35,7 @@ get_data <- function() {
# formed for whatever reason
out <- lapply(msgs$receivedMessages$message$data, function(msg) {
msg %>%
base64decode() %>%
rawToChar() %>%
msg_decode() %>%
fromJSON(flatten = TRUE, simplifyDataFrame = TRUE) %>%
as.data.frame()
}) %>% do.call(rbind, .)
Expand Down Expand Up @@ -79,7 +77,7 @@ server <- function(input, output, session) {

# Set up a message consumer in background sessions (poll messages every 2 seconds)
observe({
invalidateLater(20000, session)
invalidateLater(2000, session)

future_promise({
pubsub_auth() # Authenticated session is not passed to futures' env
Expand All @@ -93,10 +91,9 @@ server <- function(input, output, session) {
duration = 3,
type = "warning"
)
# Append to the reactive dataframe
out_df$df <- rbind(out_df$df, res)
}

# Append to the reactive dataframe
out_df$df <- rbind(out_df$df, res)
})

# Hide the future, this is a fire and forget hack and allows avoid blocking
Expand Down
3 changes: 2 additions & 1 deletion man/DlqPolicy.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/PubsubMessage.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions man/msg_decode.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d14dda8

Please sign in to comment.