Compare commits
20 Commits
|
@ -0,0 +1,175 @@
|
|||
package fcm
|
||||
|
||||
/*
|
||||
const (
|
||||
// Batch Send API
|
||||
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
|
||||
// Requests to the endpoint will start failing after 6/21/2024.
|
||||
// Migrate to the standard HTTP v1 API send method, which supports HTTP/2 for multiplexing.
|
||||
)
|
||||
|
||||
type ClientConfig struct {
|
||||
SendEndpoint string
|
||||
//BatchSendEndpoint string
|
||||
}
|
||||
|
||||
// Deprecated: Use SendEach instead.
|
||||
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
|
||||
return c.doSendMessages(messages, false)
|
||||
}
|
||||
|
||||
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
|
||||
return c.doSendMessages(messages, true)
|
||||
}
|
||||
|
||||
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
|
||||
messageCount := len(messages)
|
||||
if messageCount == 0 {
|
||||
return MultiSendResponse{}, nil
|
||||
}
|
||||
if messageCount > maxMessages {
|
||||
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
|
||||
}
|
||||
|
||||
accessToken, err := c.ts.Token()
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
w := multipart.NewWriter(&body)
|
||||
w.SetBoundary(multipartBoundary)
|
||||
|
||||
for index, msg := range messages {
|
||||
req := SendRequest{
|
||||
ValidateOnly: validateOnly,
|
||||
Message: msg,
|
||||
}
|
||||
|
||||
body, err := c.makeMessageRequest(req)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
|
||||
if err := writePartTo(w, body, index); err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
accessToken.SetAuthHeader(request)
|
||||
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
|
||||
|
||||
response, err := c.hc.Do(request)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
return c.makeMultiSendResponse(response, messageCount)
|
||||
}
|
||||
|
||||
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
|
||||
reqJson, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
|
||||
request.Header.Set("User-Agent", "")
|
||||
|
||||
var body bytes.Buffer
|
||||
if err := request.Write(&body); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return body.Bytes(), nil
|
||||
}
|
||||
|
||||
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
|
||||
header := make(textproto.MIMEHeader)
|
||||
header.Set("Content-Type", "application/http")
|
||||
header.Set("Content-Transfer-Encoding", "binary")
|
||||
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
|
||||
|
||||
part, err := w.CreatePart(header)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
if _, err := part.Write(bytes); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
|
||||
responses := make([]SendResponse, 0, totalCount)
|
||||
var fails int
|
||||
|
||||
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader := multipart.NewReader(response.Body, params["boundary"])
|
||||
for {
|
||||
part, err := reader.NextPart()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
resp, err := makeSendResponseFromPart(part)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
|
||||
responses = append(responses, resp)
|
||||
if resp.HasError() {
|
||||
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
|
||||
fails++
|
||||
}
|
||||
}
|
||||
|
||||
return MultiSendResponse{
|
||||
Responses: responses,
|
||||
Sent: totalCount - fails,
|
||||
Failed: fails,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
|
||||
response, err := http.ReadResponse(bufio.NewReader(part), nil)
|
||||
if err != nil {
|
||||
return SendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return SendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
var resp SendResponse
|
||||
if err := json.Unmarshal(body, &resp); err != nil {
|
||||
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
*/
|
254
fcm.go
254
fcm.go
|
@ -10,26 +10,24 @@
|
|||
package fcm
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"git.bit5.ru/backend/errors"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"golang.org/x/oauth2"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const tracerName = "git.bit5.ru/backend/fcm"
|
||||
|
@ -43,15 +41,13 @@ const (
|
|||
|
||||
maxMessages = 500
|
||||
multipartBoundary = "msg_boundary"
|
||||
|
||||
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
|
||||
)
|
||||
|
||||
var (
|
||||
AuthScopes = []string{"https://www.googleapis.com/auth/firebase.messaging"}
|
||||
)
|
||||
|
||||
func MakeSendEndpoint(projectId string) string {
|
||||
func makeSendEndpoint(projectId string) string {
|
||||
return fmt.Sprintf("https://fcm.googleapis.com/v1/projects/%s/messages:send", projectId)
|
||||
}
|
||||
|
||||
|
@ -70,7 +66,7 @@ type Credentials struct {
|
|||
}
|
||||
|
||||
func ReadCredentialsFromFile(filename string) (Credentials, error) {
|
||||
data, err := ioutil.ReadFile(filename)
|
||||
data, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
return Credentials{}, errors.WithStack(err)
|
||||
}
|
||||
|
@ -83,24 +79,19 @@ func ReadCredentialsFromFile(filename string) (Credentials, error) {
|
|||
return c, nil
|
||||
}
|
||||
|
||||
type ClientConfig struct {
|
||||
SendEndpoint string
|
||||
BatchSendEndpoint string
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
cfg ClientConfig
|
||||
ts oauth2.TokenSource
|
||||
hc *http.Client
|
||||
logger logr.Logger
|
||||
sendEndpoint string
|
||||
ts oauth2.TokenSource
|
||||
hc *http.Client
|
||||
logger logr.Logger
|
||||
}
|
||||
|
||||
func NewClient(cfg ClientConfig, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client {
|
||||
func NewClient(projectId string, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client {
|
||||
return &Client{
|
||||
cfg: cfg,
|
||||
ts: ts,
|
||||
hc: hc,
|
||||
logger: logger,
|
||||
sendEndpoint: makeSendEndpoint(projectId),
|
||||
ts: ts,
|
||||
hc: hc,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -172,7 +163,11 @@ func (c *Client) Validate(ctx context.Context, message Message) (string, error)
|
|||
|
||||
func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabled bool) (SendResponse, error) {
|
||||
|
||||
_, span := tracer.Start(ctx, "Client.doSendRequest")
|
||||
spanAttrs := trace.WithAttributes(
|
||||
attribute.String("token", req.Message.Token),
|
||||
)
|
||||
|
||||
_, span := tracer.Start(ctx, "Client.doSendRequest", spanAttrs)
|
||||
defer span.End()
|
||||
|
||||
accessToken, err := c.ts.Token()
|
||||
|
@ -188,11 +183,14 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
|
|||
span.RecordError(err)
|
||||
return SendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
span.SetAttributes(
|
||||
attribute.String("request.body", string(data)),
|
||||
)
|
||||
if loggerEnabled {
|
||||
c.logger.Info("sending", "message", data)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewReader(data))
|
||||
request, err := http.NewRequest(http.MethodPost, c.sendEndpoint, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.RecordError(err)
|
||||
|
@ -204,6 +202,8 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
|
|||
|
||||
response, err := c.hc.Do(request)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.RecordError(err)
|
||||
return SendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
@ -219,7 +219,7 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
|
|||
|
||||
span.SetAttributes(
|
||||
attribute.Int("response.status_code", response.StatusCode),
|
||||
attribute.String("response_body", bodyStr),
|
||||
attribute.String("response.body", bodyStr),
|
||||
)
|
||||
|
||||
if response.StatusCode != http.StatusOK {
|
||||
|
@ -240,172 +240,23 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
// Deprecated: Use SendEach instead.
|
||||
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
|
||||
return c.doSendMessages(messages, false)
|
||||
}
|
||||
|
||||
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
|
||||
return c.doSendMessages(messages, true)
|
||||
}
|
||||
|
||||
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
|
||||
messageCount := len(messages)
|
||||
if messageCount == 0 {
|
||||
return MultiSendResponse{}, nil
|
||||
}
|
||||
if messageCount > maxMessages {
|
||||
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
|
||||
}
|
||||
|
||||
accessToken, err := c.ts.Token()
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
w := multipart.NewWriter(&body)
|
||||
w.SetBoundary(multipartBoundary)
|
||||
|
||||
for index, msg := range messages {
|
||||
req := SendRequest{
|
||||
ValidateOnly: validateOnly,
|
||||
Message: msg,
|
||||
}
|
||||
|
||||
body, err := c.makeMessageRequest(req)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
|
||||
if err := writePartTo(w, body, index); err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
accessToken.SetAuthHeader(request)
|
||||
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
|
||||
|
||||
response, err := c.hc.Do(request)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
return c.makeMultiSendResponse(response, messageCount)
|
||||
}
|
||||
|
||||
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
|
||||
reqJson, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
|
||||
request.Header.Set("User-Agent", "")
|
||||
|
||||
var body bytes.Buffer
|
||||
if err := request.Write(&body); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return body.Bytes(), nil
|
||||
}
|
||||
|
||||
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
|
||||
header := make(textproto.MIMEHeader)
|
||||
header.Set("Content-Type", "application/http")
|
||||
header.Set("Content-Transfer-Encoding", "binary")
|
||||
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
|
||||
|
||||
part, err := w.CreatePart(header)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
if _, err := part.Write(bytes); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
|
||||
responses := make([]SendResponse, 0, totalCount)
|
||||
var fails int
|
||||
|
||||
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
reader := multipart.NewReader(response.Body, params["boundary"])
|
||||
for {
|
||||
part, err := reader.NextPart()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return MultiSendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
resp, err := makeSendResponseFromPart(part)
|
||||
if err != nil {
|
||||
return MultiSendResponse{}, err
|
||||
}
|
||||
|
||||
responses = append(responses, resp)
|
||||
if resp.HasError() {
|
||||
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
|
||||
fails++
|
||||
}
|
||||
}
|
||||
|
||||
return MultiSendResponse{
|
||||
Responses: responses,
|
||||
Sent: totalCount - fails,
|
||||
Failed: fails,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
|
||||
response, err := http.ReadResponse(bufio.NewReader(part), nil)
|
||||
if err != nil {
|
||||
return SendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
defer response.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return SendResponse{}, errors.WithStack(err)
|
||||
}
|
||||
|
||||
var resp SendResponse
|
||||
if err := json.Unmarshal(body, &resp); err != nil {
|
||||
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *Client) SendEach(ctx context.Context, messages []Message) (MessageMultiSendResponse, error) {
|
||||
|
||||
ctx, span := tracer.Start(ctx, "Client.SendEach")
|
||||
defer span.End()
|
||||
|
||||
return c.doSendEachInBatch(ctx, messages, false)
|
||||
resp, err := c.doSendEachInBatch(ctx, messages, false)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.RecordError(err)
|
||||
return MessageMultiSendResponse{}, err
|
||||
}
|
||||
|
||||
if resp.Failed > 0 {
|
||||
span.SetStatus(codes.Error, "Some notifications not sent.")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *Client) doSendEachInBatch(
|
||||
|
@ -421,8 +272,12 @@ func (c *Client) doSendEachInBatch(
|
|||
if messageCount == 0 {
|
||||
return MessageMultiSendResponse{}, nil
|
||||
}
|
||||
|
||||
if messageCount > maxMessages {
|
||||
return MessageMultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
|
||||
err := errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.RecordError(err)
|
||||
return MessageMultiSendResponse{}, err
|
||||
}
|
||||
|
||||
var responses = make([]MessageSendResponse, len(messages))
|
||||
|
@ -448,6 +303,7 @@ func (c *Client) doSendEachInBatch(
|
|||
MessageID: resp,
|
||||
}
|
||||
} else {
|
||||
span.SetStatus(codes.Error, "Some notifications not sent.")
|
||||
responses[idx] = MessageSendResponse{
|
||||
Success: false,
|
||||
Error: err,
|
||||
|
@ -458,18 +314,28 @@ func (c *Client) doSendEachInBatch(
|
|||
// Wait for all Validate/Send calls to finish
|
||||
wg.Wait()
|
||||
|
||||
successCount := 0
|
||||
sentAmount := 0
|
||||
for _, r := range responses {
|
||||
if r.Success {
|
||||
successCount++
|
||||
sentAmount++
|
||||
}
|
||||
}
|
||||
|
||||
return MessageMultiSendResponse{
|
||||
failedAmount := len(responses) - sentAmount
|
||||
|
||||
span.SetAttributes(
|
||||
attribute.Int("resp.total", len(responses)),
|
||||
attribute.Int("resp.sent_amount", sentAmount),
|
||||
attribute.Int("resp.failed_amount", failedAmount),
|
||||
)
|
||||
|
||||
resp := MessageMultiSendResponse{
|
||||
Responses: responses,
|
||||
Sent: successCount,
|
||||
Failed: len(responses) - successCount,
|
||||
}, nil
|
||||
Sent: sentAmount,
|
||||
Failed: failedAmount,
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
type MessageSendResponse struct {
|
||||
|
|
16
message.go
16
message.go
|
@ -1,9 +1,14 @@
|
|||
package fcm
|
||||
|
||||
// Message to send by Firebase Cloud Messaging Service.
|
||||
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#resource:-message
|
||||
type Message struct {
|
||||
Token string `json:"token,omitempty"`
|
||||
Topic string `json:"topic,omitempty"`
|
||||
Condition string `json:"condition,omitempty"`
|
||||
// Union field target can be only one of the following:
|
||||
Token string `json:"token,omitempty"`
|
||||
Topic string `json:"topic,omitempty"`
|
||||
Condition string `json:"condition,omitempty"`
|
||||
// End of list of possible types for union field target.
|
||||
|
||||
Data map[string]string `json:"data,omitempty"`
|
||||
Notification *Notification `json:"notification,omitempty"`
|
||||
FcmOptions *FcmOptions `json:"fcm_options,omitempty"`
|
||||
|
@ -13,6 +18,7 @@ type Message struct {
|
|||
}
|
||||
|
||||
// Basic notification template to use across all platforms.
|
||||
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#notification
|
||||
type Notification struct {
|
||||
Title string `json:"title,omitempty"`
|
||||
Body string `json:"body,omitempty"`
|
||||
|
@ -170,8 +176,8 @@ type ApnsPayload struct {
|
|||
Aps ApnsPayloadKeys `json:"aps"`
|
||||
|
||||
// TODO: add support for custom keys
|
||||
// https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/generating_a_remote_notification
|
||||
customKeys map[string]interface{}
|
||||
// https://developer.apple.com/documentation/usernotifications/generating-a-remote-notification
|
||||
customKeys map[string]any
|
||||
}
|
||||
|
||||
type ApnsPayloadKeys struct {
|
||||
|
|
Loading…
Reference in New Issue