Compare commits

..

No commits in common. "master" and "v1.3.8" have entirely different histories.

3 changed files with 199 additions and 241 deletions

View File

@ -1,175 +0,0 @@
package fcm
/*
const (
// Batch Send API
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
// Requests to the endpoint will start failing after 6/21/2024.
// Migrate to the standard HTTP v1 API send method, which supports HTTP/2 for multiplexing.
)
type ClientConfig struct {
SendEndpoint string
//BatchSendEndpoint string
}
// Deprecated: Use SendEach instead.
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, false)
}
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, true)
}
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
messageCount := len(messages)
if messageCount == 0 {
return MultiSendResponse{}, nil
}
if messageCount > maxMessages {
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
}
accessToken, err := c.ts.Token()
if err != nil {
return MultiSendResponse{}, err
}
var body bytes.Buffer
w := multipart.NewWriter(&body)
w.SetBoundary(multipartBoundary)
for index, msg := range messages {
req := SendRequest{
ValidateOnly: validateOnly,
Message: msg,
}
body, err := c.makeMessageRequest(req)
if err != nil {
return MultiSendResponse{}, err
}
if err := writePartTo(w, body, index); err != nil {
return MultiSendResponse{}, err
}
}
if err := w.Close(); err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
accessToken.SetAuthHeader(request)
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
response, err := c.hc.Do(request)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
return c.makeMultiSendResponse(response, messageCount)
}
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
reqJson, err := json.Marshal(req)
if err != nil {
return nil, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
if err != nil {
return nil, errors.WithStack(err)
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
request.Header.Set("User-Agent", "")
var body bytes.Buffer
if err := request.Write(&body); err != nil {
return nil, errors.WithStack(err)
}
return body.Bytes(), nil
}
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/http")
header.Set("Content-Transfer-Encoding", "binary")
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
part, err := w.CreatePart(header)
if err != nil {
return errors.WithStack(err)
}
if _, err := part.Write(bytes); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
responses := make([]SendResponse, 0, totalCount)
var fails int
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
reader := multipart.NewReader(response.Body, params["boundary"])
for {
part, err := reader.NextPart()
if err == io.EOF {
break
} else if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
resp, err := makeSendResponseFromPart(part)
if err != nil {
return MultiSendResponse{}, err
}
responses = append(responses, resp)
if resp.HasError() {
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
fails++
}
}
return MultiSendResponse{
Responses: responses,
Sent: totalCount - fails,
Failed: fails,
}, nil
}
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
response, err := http.ReadResponse(bufio.NewReader(part), nil)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
var resp SendResponse
if err := json.Unmarshal(body, &resp); err != nil {
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
}
return resp, nil
}
*/

249
fcm.go
View File

@ -10,24 +10,26 @@
package fcm package fcm
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"mime"
"mime/multipart"
"net/http" "net/http"
"os" "net/textproto"
"sync" "sync"
"git.bit5.ru/backend/errors" "git.bit5.ru/backend/errors"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"golang.org/x/oauth2" "golang.org/x/oauth2"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
) )
const tracerName = "git.bit5.ru/backend/fcm" const tracerName = "git.bit5.ru/backend/fcm"
@ -41,13 +43,15 @@ const (
maxMessages = 500 maxMessages = 500
multipartBoundary = "msg_boundary" multipartBoundary = "msg_boundary"
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
) )
var ( var (
AuthScopes = []string{"https://www.googleapis.com/auth/firebase.messaging"} AuthScopes = []string{"https://www.googleapis.com/auth/firebase.messaging"}
) )
func makeSendEndpoint(projectId string) string { func MakeSendEndpoint(projectId string) string {
return fmt.Sprintf("https://fcm.googleapis.com/v1/projects/%s/messages:send", projectId) return fmt.Sprintf("https://fcm.googleapis.com/v1/projects/%s/messages:send", projectId)
} }
@ -66,7 +70,7 @@ type Credentials struct {
} }
func ReadCredentialsFromFile(filename string) (Credentials, error) { func ReadCredentialsFromFile(filename string) (Credentials, error) {
data, err := os.ReadFile(filename) data, err := ioutil.ReadFile(filename)
if err != nil { if err != nil {
return Credentials{}, errors.WithStack(err) return Credentials{}, errors.WithStack(err)
} }
@ -79,19 +83,24 @@ func ReadCredentialsFromFile(filename string) (Credentials, error) {
return c, nil return c, nil
} }
type Client struct { type ClientConfig struct {
sendEndpoint string SendEndpoint string
ts oauth2.TokenSource BatchSendEndpoint string
hc *http.Client
logger logr.Logger
} }
func NewClient(projectId string, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client { type Client struct {
cfg ClientConfig
ts oauth2.TokenSource
hc *http.Client
logger logr.Logger
}
func NewClient(cfg ClientConfig, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client {
return &Client{ return &Client{
sendEndpoint: makeSendEndpoint(projectId), cfg: cfg,
ts: ts, ts: ts,
hc: hc, hc: hc,
logger: logger, logger: logger,
} }
} }
@ -163,11 +172,7 @@ func (c *Client) Validate(ctx context.Context, message Message) (string, error)
func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabled bool) (SendResponse, error) { func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabled bool) (SendResponse, error) {
spanAttrs := trace.WithAttributes( _, span := tracer.Start(ctx, "Client.doSendRequest")
attribute.String("token", req.Message.Token),
)
_, span := tracer.Start(ctx, "Client.doSendRequest", spanAttrs)
defer span.End() defer span.End()
accessToken, err := c.ts.Token() accessToken, err := c.ts.Token()
@ -190,7 +195,7 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
c.logger.Info("sending", "message", data) c.logger.Info("sending", "message", data)
} }
request, err := http.NewRequest(http.MethodPost, c.sendEndpoint, bytes.NewReader(data)) request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewReader(data))
if err != nil { if err != nil {
span.SetStatus(codes.Error, err.Error()) span.SetStatus(codes.Error, err.Error())
span.RecordError(err) span.RecordError(err)
@ -240,23 +245,172 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
return resp, nil return resp, nil
} }
// Deprecated: Use SendEach instead.
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, false)
}
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, true)
}
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
messageCount := len(messages)
if messageCount == 0 {
return MultiSendResponse{}, nil
}
if messageCount > maxMessages {
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
}
accessToken, err := c.ts.Token()
if err != nil {
return MultiSendResponse{}, err
}
var body bytes.Buffer
w := multipart.NewWriter(&body)
w.SetBoundary(multipartBoundary)
for index, msg := range messages {
req := SendRequest{
ValidateOnly: validateOnly,
Message: msg,
}
body, err := c.makeMessageRequest(req)
if err != nil {
return MultiSendResponse{}, err
}
if err := writePartTo(w, body, index); err != nil {
return MultiSendResponse{}, err
}
}
if err := w.Close(); err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
accessToken.SetAuthHeader(request)
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
response, err := c.hc.Do(request)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
return c.makeMultiSendResponse(response, messageCount)
}
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
reqJson, err := json.Marshal(req)
if err != nil {
return nil, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
if err != nil {
return nil, errors.WithStack(err)
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
request.Header.Set("User-Agent", "")
var body bytes.Buffer
if err := request.Write(&body); err != nil {
return nil, errors.WithStack(err)
}
return body.Bytes(), nil
}
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/http")
header.Set("Content-Transfer-Encoding", "binary")
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
part, err := w.CreatePart(header)
if err != nil {
return errors.WithStack(err)
}
if _, err := part.Write(bytes); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
responses := make([]SendResponse, 0, totalCount)
var fails int
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
reader := multipart.NewReader(response.Body, params["boundary"])
for {
part, err := reader.NextPart()
if err == io.EOF {
break
} else if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
resp, err := makeSendResponseFromPart(part)
if err != nil {
return MultiSendResponse{}, err
}
responses = append(responses, resp)
if resp.HasError() {
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
fails++
}
}
return MultiSendResponse{
Responses: responses,
Sent: totalCount - fails,
Failed: fails,
}, nil
}
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
response, err := http.ReadResponse(bufio.NewReader(part), nil)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
var resp SendResponse
if err := json.Unmarshal(body, &resp); err != nil {
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
}
return resp, nil
}
func (c *Client) SendEach(ctx context.Context, messages []Message) (MessageMultiSendResponse, error) { func (c *Client) SendEach(ctx context.Context, messages []Message) (MessageMultiSendResponse, error) {
ctx, span := tracer.Start(ctx, "Client.SendEach") ctx, span := tracer.Start(ctx, "Client.SendEach")
defer span.End() defer span.End()
resp, err := c.doSendEachInBatch(ctx, messages, false) return c.doSendEachInBatch(ctx, messages, false)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return MessageMultiSendResponse{}, err
}
if resp.Failed > 0 {
span.SetStatus(codes.Error, "Some notifications not sent.")
}
return resp, nil
} }
func (c *Client) doSendEachInBatch( func (c *Client) doSendEachInBatch(
@ -272,12 +426,8 @@ func (c *Client) doSendEachInBatch(
if messageCount == 0 { if messageCount == 0 {
return MessageMultiSendResponse{}, nil return MessageMultiSendResponse{}, nil
} }
if messageCount > maxMessages { if messageCount > maxMessages {
err := errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount)) return MessageMultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return MessageMultiSendResponse{}, err
} }
var responses = make([]MessageSendResponse, len(messages)) var responses = make([]MessageSendResponse, len(messages))
@ -303,7 +453,6 @@ func (c *Client) doSendEachInBatch(
MessageID: resp, MessageID: resp,
} }
} else { } else {
span.SetStatus(codes.Error, "Some notifications not sent.")
responses[idx] = MessageSendResponse{ responses[idx] = MessageSendResponse{
Success: false, Success: false,
Error: err, Error: err,
@ -314,28 +463,18 @@ func (c *Client) doSendEachInBatch(
// Wait for all Validate/Send calls to finish // Wait for all Validate/Send calls to finish
wg.Wait() wg.Wait()
sentAmount := 0 successCount := 0
for _, r := range responses { for _, r := range responses {
if r.Success { if r.Success {
sentAmount++ successCount++
} }
} }
failedAmount := len(responses) - sentAmount return MessageMultiSendResponse{
span.SetAttributes(
attribute.Int("resp.total", len(responses)),
attribute.Int("resp.sent_amount", sentAmount),
attribute.Int("resp.failed_amount", failedAmount),
)
resp := MessageMultiSendResponse{
Responses: responses, Responses: responses,
Sent: sentAmount, Sent: successCount,
Failed: failedAmount, Failed: len(responses) - successCount,
} }, nil
return resp, nil
} }
type MessageSendResponse struct { type MessageSendResponse struct {

View File

@ -1,14 +1,9 @@
package fcm package fcm
// Message to send by Firebase Cloud Messaging Service.
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#resource:-message
type Message struct { type Message struct {
// Union field target can be only one of the following: Token string `json:"token,omitempty"`
Token string `json:"token,omitempty"` Topic string `json:"topic,omitempty"`
Topic string `json:"topic,omitempty"` Condition string `json:"condition,omitempty"`
Condition string `json:"condition,omitempty"`
// End of list of possible types for union field target.
Data map[string]string `json:"data,omitempty"` Data map[string]string `json:"data,omitempty"`
Notification *Notification `json:"notification,omitempty"` Notification *Notification `json:"notification,omitempty"`
FcmOptions *FcmOptions `json:"fcm_options,omitempty"` FcmOptions *FcmOptions `json:"fcm_options,omitempty"`
@ -18,7 +13,6 @@ type Message struct {
} }
// Basic notification template to use across all platforms. // Basic notification template to use across all platforms.
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#notification
type Notification struct { type Notification struct {
Title string `json:"title,omitempty"` Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"` Body string `json:"body,omitempty"`
@ -176,8 +170,8 @@ type ApnsPayload struct {
Aps ApnsPayloadKeys `json:"aps"` Aps ApnsPayloadKeys `json:"aps"`
// TODO: add support for custom keys // TODO: add support for custom keys
// https://developer.apple.com/documentation/usernotifications/generating-a-remote-notification // https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/generating_a_remote_notification
customKeys map[string]any customKeys map[string]interface{}
} }
type ApnsPayloadKeys struct { type ApnsPayloadKeys struct {