Compare commits

..

No commits in common. "master" and "v1.3.6" have entirely different histories.

3 changed files with 200 additions and 247 deletions

View File

@ -1,175 +0,0 @@
package fcm
/*
const (
// Batch Send API
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
// Requests to the endpoint will start failing after 6/21/2024.
// Migrate to the standard HTTP v1 API send method, which supports HTTP/2 for multiplexing.
)
type ClientConfig struct {
SendEndpoint string
//BatchSendEndpoint string
}
// Deprecated: Use SendEach instead.
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, false)
}
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, true)
}
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
messageCount := len(messages)
if messageCount == 0 {
return MultiSendResponse{}, nil
}
if messageCount > maxMessages {
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
}
accessToken, err := c.ts.Token()
if err != nil {
return MultiSendResponse{}, err
}
var body bytes.Buffer
w := multipart.NewWriter(&body)
w.SetBoundary(multipartBoundary)
for index, msg := range messages {
req := SendRequest{
ValidateOnly: validateOnly,
Message: msg,
}
body, err := c.makeMessageRequest(req)
if err != nil {
return MultiSendResponse{}, err
}
if err := writePartTo(w, body, index); err != nil {
return MultiSendResponse{}, err
}
}
if err := w.Close(); err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
accessToken.SetAuthHeader(request)
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
response, err := c.hc.Do(request)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
return c.makeMultiSendResponse(response, messageCount)
}
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
reqJson, err := json.Marshal(req)
if err != nil {
return nil, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
if err != nil {
return nil, errors.WithStack(err)
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
request.Header.Set("User-Agent", "")
var body bytes.Buffer
if err := request.Write(&body); err != nil {
return nil, errors.WithStack(err)
}
return body.Bytes(), nil
}
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/http")
header.Set("Content-Transfer-Encoding", "binary")
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
part, err := w.CreatePart(header)
if err != nil {
return errors.WithStack(err)
}
if _, err := part.Write(bytes); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
responses := make([]SendResponse, 0, totalCount)
var fails int
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
reader := multipart.NewReader(response.Body, params["boundary"])
for {
part, err := reader.NextPart()
if err == io.EOF {
break
} else if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
resp, err := makeSendResponseFromPart(part)
if err != nil {
return MultiSendResponse{}, err
}
responses = append(responses, resp)
if resp.HasError() {
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
fails++
}
}
return MultiSendResponse{
Responses: responses,
Sent: totalCount - fails,
Failed: fails,
}, nil
}
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
response, err := http.ReadResponse(bufio.NewReader(part), nil)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
var resp SendResponse
if err := json.Unmarshal(body, &resp); err != nil {
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
}
return resp, nil
}
*/

256
fcm.go
View File

@ -10,24 +10,26 @@
package fcm
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime"
"mime/multipart"
"net/http"
"os"
"net/textproto"
"sync"
"git.bit5.ru/backend/errors"
"github.com/go-logr/logr"
"golang.org/x/oauth2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const tracerName = "git.bit5.ru/backend/fcm"
@ -41,13 +43,15 @@ const (
maxMessages = 500
multipartBoundary = "msg_boundary"
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
)
var (
AuthScopes = []string{"https://www.googleapis.com/auth/firebase.messaging"}
)
func makeSendEndpoint(projectId string) string {
func MakeSendEndpoint(projectId string) string {
return fmt.Sprintf("https://fcm.googleapis.com/v1/projects/%s/messages:send", projectId)
}
@ -66,7 +70,7 @@ type Credentials struct {
}
func ReadCredentialsFromFile(filename string) (Credentials, error) {
data, err := os.ReadFile(filename)
data, err := ioutil.ReadFile(filename)
if err != nil {
return Credentials{}, errors.WithStack(err)
}
@ -79,19 +83,24 @@ func ReadCredentialsFromFile(filename string) (Credentials, error) {
return c, nil
}
type Client struct {
sendEndpoint string
ts oauth2.TokenSource
hc *http.Client
logger logr.Logger
type ClientConfig struct {
SendEndpoint string
BatchSendEndpoint string
}
func NewClient(projectId string, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client {
type Client struct {
cfg ClientConfig
ts oauth2.TokenSource
hc *http.Client
logger logr.Logger
}
func NewClient(cfg ClientConfig, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client {
return &Client{
sendEndpoint: makeSendEndpoint(projectId),
ts: ts,
hc: hc,
logger: logger,
cfg: cfg,
ts: ts,
hc: hc,
logger: logger,
}
}
@ -163,11 +172,7 @@ func (c *Client) Validate(ctx context.Context, message Message) (string, error)
func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabled bool) (SendResponse, error) {
spanAttrs := trace.WithAttributes(
attribute.String("token", req.Message.Token),
)
_, span := tracer.Start(ctx, "Client.doSendRequest", spanAttrs)
_, span := tracer.Start(ctx, "Client.doSendRequest")
defer span.End()
accessToken, err := c.ts.Token()
@ -183,14 +188,11 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
span.RecordError(err)
return SendResponse{}, errors.WithStack(err)
}
span.SetAttributes(
attribute.String("request.body", string(data)),
)
if loggerEnabled {
c.logger.Info("sending", "message", data)
}
request, err := http.NewRequest(http.MethodPost, c.sendEndpoint, bytes.NewReader(data))
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewReader(data))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
@ -202,8 +204,6 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
response, err := c.hc.Do(request)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
@ -219,7 +219,7 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
span.SetAttributes(
attribute.Int("response.status_code", response.StatusCode),
attribute.String("response.body", bodyStr),
attribute.String("response_body", bodyStr),
)
if response.StatusCode != http.StatusOK {
@ -240,23 +240,172 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
return resp, nil
}
// Deprecated: Use SendEach instead.
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, false)
}
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, true)
}
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
messageCount := len(messages)
if messageCount == 0 {
return MultiSendResponse{}, nil
}
if messageCount > maxMessages {
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
}
accessToken, err := c.ts.Token()
if err != nil {
return MultiSendResponse{}, err
}
var body bytes.Buffer
w := multipart.NewWriter(&body)
w.SetBoundary(multipartBoundary)
for index, msg := range messages {
req := SendRequest{
ValidateOnly: validateOnly,
Message: msg,
}
body, err := c.makeMessageRequest(req)
if err != nil {
return MultiSendResponse{}, err
}
if err := writePartTo(w, body, index); err != nil {
return MultiSendResponse{}, err
}
}
if err := w.Close(); err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
accessToken.SetAuthHeader(request)
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
response, err := c.hc.Do(request)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
return c.makeMultiSendResponse(response, messageCount)
}
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
reqJson, err := json.Marshal(req)
if err != nil {
return nil, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
if err != nil {
return nil, errors.WithStack(err)
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
request.Header.Set("User-Agent", "")
var body bytes.Buffer
if err := request.Write(&body); err != nil {
return nil, errors.WithStack(err)
}
return body.Bytes(), nil
}
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/http")
header.Set("Content-Transfer-Encoding", "binary")
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
part, err := w.CreatePart(header)
if err != nil {
return errors.WithStack(err)
}
if _, err := part.Write(bytes); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
responses := make([]SendResponse, 0, totalCount)
var fails int
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
reader := multipart.NewReader(response.Body, params["boundary"])
for {
part, err := reader.NextPart()
if err == io.EOF {
break
} else if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
resp, err := makeSendResponseFromPart(part)
if err != nil {
return MultiSendResponse{}, err
}
responses = append(responses, resp)
if resp.HasError() {
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
fails++
}
}
return MultiSendResponse{
Responses: responses,
Sent: totalCount - fails,
Failed: fails,
}, nil
}
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
response, err := http.ReadResponse(bufio.NewReader(part), nil)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
var resp SendResponse
if err := json.Unmarshal(body, &resp); err != nil {
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
}
return resp, nil
}
func (c *Client) SendEach(ctx context.Context, messages []Message) (MessageMultiSendResponse, error) {
ctx, span := tracer.Start(ctx, "Client.SendEach")
defer span.End()
resp, err := c.doSendEachInBatch(ctx, messages, false)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return MessageMultiSendResponse{}, err
}
if resp.Failed > 0 {
span.SetStatus(codes.Error, "Some notifications not sent.")
}
return resp, nil
return c.doSendEachInBatch(ctx, messages, false)
}
func (c *Client) doSendEachInBatch(
@ -272,12 +421,8 @@ func (c *Client) doSendEachInBatch(
if messageCount == 0 {
return MessageMultiSendResponse{}, nil
}
if messageCount > maxMessages {
err := errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return MessageMultiSendResponse{}, err
return MessageMultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
}
var responses = make([]MessageSendResponse, len(messages))
@ -303,7 +448,6 @@ func (c *Client) doSendEachInBatch(
MessageID: resp,
}
} else {
span.SetStatus(codes.Error, "Some notifications not sent.")
responses[idx] = MessageSendResponse{
Success: false,
Error: err,
@ -314,28 +458,18 @@ func (c *Client) doSendEachInBatch(
// Wait for all Validate/Send calls to finish
wg.Wait()
sentAmount := 0
successCount := 0
for _, r := range responses {
if r.Success {
sentAmount++
successCount++
}
}
failedAmount := len(responses) - sentAmount
span.SetAttributes(
attribute.Int("resp.total", len(responses)),
attribute.Int("resp.sent_amount", sentAmount),
attribute.Int("resp.failed_amount", failedAmount),
)
resp := MessageMultiSendResponse{
return MessageMultiSendResponse{
Responses: responses,
Sent: sentAmount,
Failed: failedAmount,
}
return resp, nil
Sent: successCount,
Failed: len(responses) - successCount,
}, nil
}
type MessageSendResponse struct {

View File

@ -1,14 +1,9 @@
package fcm
// Message to send by Firebase Cloud Messaging Service.
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#resource:-message
type Message struct {
// Union field target can be only one of the following:
Token string `json:"token,omitempty"`
Topic string `json:"topic,omitempty"`
Condition string `json:"condition,omitempty"`
// End of list of possible types for union field target.
Token string `json:"token,omitempty"`
Topic string `json:"topic,omitempty"`
Condition string `json:"condition,omitempty"`
Data map[string]string `json:"data,omitempty"`
Notification *Notification `json:"notification,omitempty"`
FcmOptions *FcmOptions `json:"fcm_options,omitempty"`
@ -18,7 +13,6 @@ type Message struct {
}
// Basic notification template to use across all platforms.
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#notification
type Notification struct {
Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"`
@ -176,8 +170,8 @@ type ApnsPayload struct {
Aps ApnsPayloadKeys `json:"aps"`
// TODO: add support for custom keys
// https://developer.apple.com/documentation/usernotifications/generating-a-remote-notification
customKeys map[string]any
// https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/generating_a_remote_notification
customKeys map[string]interface{}
}
type ApnsPayloadKeys struct {