Compare commits

...

21 Commits

Author SHA1 Message Date
Владислав Весельский a25fdc474a Do not export func MakeSendEndpoint. 2025-02-05 12:07:52 +03:00
Владислав Весельский c76b172a3a Delete unused type ClientConfig. 2025-02-05 11:50:10 +03:00
Владислав Весельский eb024fd00c Unused field ClientConfig.SendEndpoint deleted. 2025-02-05 11:46:43 +03:00
Владислав Весельский d2b252d82a Use func MakeSendEndpoint. 2025-02-05 11:44:55 +03:00
Владислав Весельский 250cfe9d33 Field Client.cfg deleted. Field Client.sendEndpoint added. 2025-02-05 11:41:38 +03:00
Владислав Весельский 65928f4c0b Save token in span. 2025-02-03 12:05:43 +03:00
Владислав Весельский 678438ace4 Func NewClient. Add param projectId. 2025-02-01 20:13:14 +03:00
Владислав Весельский a1fddd32af Update ref. 2025-01-29 14:34:26 +03:00
Владислав Весельский 5af327af4a Use interface any. 2025-01-29 14:31:13 +03:00
Владислав Весельский 7462961556 Add comments. 2025-01-29 14:28:24 +03:00
Владислав Весельский f0a01461fa Use os.ReadFile(). 2025-01-29 14:13:46 +03:00
Владислав Весельский dd7ee3f703 Use io.ReadAll(). 2025-01-29 14:08:21 +03:00
Владислав Весельский f38f14b926 Add comment. 2025-01-29 11:51:28 +03:00
Владислав Весельский 6f168e543f Move deprecated code to file deprecated.go. 2025-01-29 11:14:44 +03:00
Владислав Весельский 687b3c056f Add comment. 2025-01-29 11:00:21 +03:00
Владислав Весельский bc73d56c8b Add logs. 2025-01-28 17:04:59 +03:00
Владислав Весельский 6bda66b44d Set span status to codes.Error. 2025-01-28 16:45:22 +03:00
Владислав Весельский 0a8bde4947 Output reqest body. 2025-01-28 16:11:21 +03:00
Владислав Весельский e3ee45f895 Add span attribute request_body. 2025-01-28 15:27:58 +03:00
Владислав Весельский 86e7c8815f Set span status to codes.Error. 2025-01-28 15:18:06 +03:00
Владислав Весельский 07f480582b Set span status codes.Error. 2025-01-28 15:07:14 +03:00
3 changed files with 262 additions and 203 deletions

175
deprecated.go Normal file
View File

@ -0,0 +1,175 @@
package fcm
/*
const (
// Batch Send API
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
// Requests to the endpoint will start failing after 6/21/2024.
// Migrate to the standard HTTP v1 API send method, which supports HTTP/2 for multiplexing.
)
type ClientConfig struct {
SendEndpoint string
//BatchSendEndpoint string
}
// Deprecated: Use SendEach instead.
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, false)
}
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, true)
}
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
messageCount := len(messages)
if messageCount == 0 {
return MultiSendResponse{}, nil
}
if messageCount > maxMessages {
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
}
accessToken, err := c.ts.Token()
if err != nil {
return MultiSendResponse{}, err
}
var body bytes.Buffer
w := multipart.NewWriter(&body)
w.SetBoundary(multipartBoundary)
for index, msg := range messages {
req := SendRequest{
ValidateOnly: validateOnly,
Message: msg,
}
body, err := c.makeMessageRequest(req)
if err != nil {
return MultiSendResponse{}, err
}
if err := writePartTo(w, body, index); err != nil {
return MultiSendResponse{}, err
}
}
if err := w.Close(); err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
accessToken.SetAuthHeader(request)
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
response, err := c.hc.Do(request)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
return c.makeMultiSendResponse(response, messageCount)
}
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
reqJson, err := json.Marshal(req)
if err != nil {
return nil, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
if err != nil {
return nil, errors.WithStack(err)
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
request.Header.Set("User-Agent", "")
var body bytes.Buffer
if err := request.Write(&body); err != nil {
return nil, errors.WithStack(err)
}
return body.Bytes(), nil
}
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/http")
header.Set("Content-Transfer-Encoding", "binary")
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
part, err := w.CreatePart(header)
if err != nil {
return errors.WithStack(err)
}
if _, err := part.Write(bytes); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
responses := make([]SendResponse, 0, totalCount)
var fails int
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
reader := multipart.NewReader(response.Body, params["boundary"])
for {
part, err := reader.NextPart()
if err == io.EOF {
break
} else if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
resp, err := makeSendResponseFromPart(part)
if err != nil {
return MultiSendResponse{}, err
}
responses = append(responses, resp)
if resp.HasError() {
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
fails++
}
}
return MultiSendResponse{
Responses: responses,
Sent: totalCount - fails,
Failed: fails,
}, nil
}
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
response, err := http.ReadResponse(bufio.NewReader(part), nil)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
var resp SendResponse
if err := json.Unmarshal(body, &resp); err != nil {
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
}
return resp, nil
}
*/

274
fcm.go
View File

@ -10,26 +10,24 @@
package fcm
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime"
"mime/multipart"
"net/http"
"net/textproto"
"os"
"sync"
"git.bit5.ru/backend/errors"
"github.com/go-logr/logr"
"golang.org/x/oauth2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
const tracerName = "git.bit5.ru/backend/fcm"
@ -43,15 +41,13 @@ const (
maxMessages = 500
multipartBoundary = "msg_boundary"
BatchSendEndpoint = "https://fcm.googleapis.com/batch"
)
var (
AuthScopes = []string{"https://www.googleapis.com/auth/firebase.messaging"}
)
func MakeSendEndpoint(projectId string) string {
func makeSendEndpoint(projectId string) string {
return fmt.Sprintf("https://fcm.googleapis.com/v1/projects/%s/messages:send", projectId)
}
@ -70,7 +66,7 @@ type Credentials struct {
}
func ReadCredentialsFromFile(filename string) (Credentials, error) {
data, err := ioutil.ReadFile(filename)
data, err := os.ReadFile(filename)
if err != nil {
return Credentials{}, errors.WithStack(err)
}
@ -83,24 +79,19 @@ func ReadCredentialsFromFile(filename string) (Credentials, error) {
return c, nil
}
type ClientConfig struct {
SendEndpoint string
BatchSendEndpoint string
}
type Client struct {
cfg ClientConfig
ts oauth2.TokenSource
hc *http.Client
logger logr.Logger
sendEndpoint string
ts oauth2.TokenSource
hc *http.Client
logger logr.Logger
}
func NewClient(cfg ClientConfig, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client {
func NewClient(projectId string, ts oauth2.TokenSource, hc *http.Client, logger logr.Logger) *Client {
return &Client{
cfg: cfg,
ts: ts,
hc: hc,
logger: logger,
sendEndpoint: makeSendEndpoint(projectId),
ts: ts,
hc: hc,
logger: logger,
}
}
@ -139,9 +130,15 @@ func (c *Client) Send(ctx context.Context, message Message) (string, error) {
ValidateOnly: false,
Message: message,
}
resp, err := c.doSendRequest(ctx, sendRequest, false)
return resp.MessageName, err
resp, err := c.doSendRequest(ctx, sendRequest, false)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return "", err
}
return resp.MessageName, nil
}
func (c *Client) Validate(ctx context.Context, message Message) (string, error) {
@ -153,14 +150,24 @@ func (c *Client) Validate(ctx context.Context, message Message) (string, error)
ValidateOnly: true,
Message: message,
}
resp, err := c.doSendRequest(ctx, sendRequest, false)
return resp.MessageName, err
resp, err := c.doSendRequest(ctx, sendRequest, false)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return "", err
}
return resp.MessageName, nil
}
func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabled bool) (SendResponse, error) {
_, span := tracer.Start(ctx, "Client.doSendRequest")
spanAttrs := trace.WithAttributes(
attribute.String("token", req.Message.Token),
)
_, span := tracer.Start(ctx, "Client.doSendRequest", spanAttrs)
defer span.End()
accessToken, err := c.ts.Token()
@ -176,11 +183,14 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
span.RecordError(err)
return SendResponse{}, errors.WithStack(err)
}
span.SetAttributes(
attribute.String("request.body", string(data)),
)
if loggerEnabled {
c.logger.Info("sending", "message", data)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewReader(data))
request, err := http.NewRequest(http.MethodPost, c.sendEndpoint, bytes.NewReader(data))
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
@ -192,6 +202,8 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
response, err := c.hc.Do(request)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
@ -207,7 +219,7 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
span.SetAttributes(
attribute.Int("response.status_code", response.StatusCode),
attribute.String("response_body", bodyStr),
attribute.String("response.body", bodyStr),
)
if response.StatusCode != http.StatusOK {
@ -228,172 +240,23 @@ func (c *Client) doSendRequest(ctx context.Context, req SendRequest, loggerEnabl
return resp, nil
}
// Deprecated: Use SendEach instead.
func (c *Client) SendMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, false)
}
func (c *Client) ValidateMessages(messages []Message) (MultiSendResponse, error) {
return c.doSendMessages(messages, true)
}
func (c *Client) doSendMessages(messages []Message, validateOnly bool) (MultiSendResponse, error) {
messageCount := len(messages)
if messageCount == 0 {
return MultiSendResponse{}, nil
}
if messageCount > maxMessages {
return MultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
}
accessToken, err := c.ts.Token()
if err != nil {
return MultiSendResponse{}, err
}
var body bytes.Buffer
w := multipart.NewWriter(&body)
w.SetBoundary(multipartBoundary)
for index, msg := range messages {
req := SendRequest{
ValidateOnly: validateOnly,
Message: msg,
}
body, err := c.makeMessageRequest(req)
if err != nil {
return MultiSendResponse{}, err
}
if err := writePartTo(w, body, index); err != nil {
return MultiSendResponse{}, err
}
}
if err := w.Close(); err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.BatchSendEndpoint, &body)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
accessToken.SetAuthHeader(request)
request.Header.Set("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, multipartBoundary))
response, err := c.hc.Do(request)
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
return c.makeMultiSendResponse(response, messageCount)
}
func (c *Client) makeMessageRequest(req SendRequest) ([]byte, error) {
reqJson, err := json.Marshal(req)
if err != nil {
return nil, errors.WithStack(err)
}
request, err := http.NewRequest(http.MethodPost, c.cfg.SendEndpoint, bytes.NewBuffer(reqJson))
if err != nil {
return nil, errors.WithStack(err)
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
request.Header.Set("User-Agent", "")
var body bytes.Buffer
if err := request.Write(&body); err != nil {
return nil, errors.WithStack(err)
}
return body.Bytes(), nil
}
func writePartTo(w *multipart.Writer, bytes []byte, index int) error {
header := make(textproto.MIMEHeader)
header.Set("Content-Type", "application/http")
header.Set("Content-Transfer-Encoding", "binary")
header.Set("Content-ID", fmt.Sprintf("%d", index+1))
part, err := w.CreatePart(header)
if err != nil {
return errors.WithStack(err)
}
if _, err := part.Write(bytes); err != nil {
return errors.WithStack(err)
}
return nil
}
func (c *Client) makeMultiSendResponse(response *http.Response, totalCount int) (MultiSendResponse, error) {
responses := make([]SendResponse, 0, totalCount)
var fails int
_, params, err := mime.ParseMediaType(response.Header.Get("Content-Type"))
if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
reader := multipart.NewReader(response.Body, params["boundary"])
for {
part, err := reader.NextPart()
if err == io.EOF {
break
} else if err != nil {
return MultiSendResponse{}, errors.WithStack(err)
}
resp, err := makeSendResponseFromPart(part)
if err != nil {
return MultiSendResponse{}, err
}
responses = append(responses, resp)
if resp.HasError() {
c.logger.Info("fail", "error", fmt.Sprintf("%+v", *resp.Error))
fails++
}
}
return MultiSendResponse{
Responses: responses,
Sent: totalCount - fails,
Failed: fails,
}, nil
}
func makeSendResponseFromPart(part *multipart.Part) (SendResponse, error) {
response, err := http.ReadResponse(bufio.NewReader(part), nil)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return SendResponse{}, errors.WithStack(err)
}
var resp SendResponse
if err := json.Unmarshal(body, &resp); err != nil {
return SendResponse{}, errors.WithMessagef(err, "response body: %s", body)
}
return resp, nil
}
func (c *Client) SendEach(ctx context.Context, messages []Message) (MessageMultiSendResponse, error) {
ctx, span := tracer.Start(ctx, "Client.SendEach")
defer span.End()
return c.doSendEachInBatch(ctx, messages, false)
resp, err := c.doSendEachInBatch(ctx, messages, false)
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return MessageMultiSendResponse{}, err
}
if resp.Failed > 0 {
span.SetStatus(codes.Error, "Some notifications not sent.")
}
return resp, nil
}
func (c *Client) doSendEachInBatch(
@ -409,8 +272,12 @@ func (c *Client) doSendEachInBatch(
if messageCount == 0 {
return MessageMultiSendResponse{}, nil
}
if messageCount > maxMessages {
return MessageMultiSendResponse{}, errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
err := errors.New(fmt.Sprintf("messages limit (%d) exceeded: %d", maxMessages, messageCount))
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
return MessageMultiSendResponse{}, err
}
var responses = make([]MessageSendResponse, len(messages))
@ -436,6 +303,7 @@ func (c *Client) doSendEachInBatch(
MessageID: resp,
}
} else {
span.SetStatus(codes.Error, "Some notifications not sent.")
responses[idx] = MessageSendResponse{
Success: false,
Error: err,
@ -446,18 +314,28 @@ func (c *Client) doSendEachInBatch(
// Wait for all Validate/Send calls to finish
wg.Wait()
successCount := 0
sentAmount := 0
for _, r := range responses {
if r.Success {
successCount++
sentAmount++
}
}
return MessageMultiSendResponse{
failedAmount := len(responses) - sentAmount
span.SetAttributes(
attribute.Int("resp.total", len(responses)),
attribute.Int("resp.sent_amount", sentAmount),
attribute.Int("resp.failed_amount", failedAmount),
)
resp := MessageMultiSendResponse{
Responses: responses,
Sent: successCount,
Failed: len(responses) - successCount,
}, nil
Sent: sentAmount,
Failed: failedAmount,
}
return resp, nil
}
type MessageSendResponse struct {

View File

@ -1,9 +1,14 @@
package fcm
// Message to send by Firebase Cloud Messaging Service.
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#resource:-message
type Message struct {
Token string `json:"token,omitempty"`
Topic string `json:"topic,omitempty"`
Condition string `json:"condition,omitempty"`
// Union field target can be only one of the following:
Token string `json:"token,omitempty"`
Topic string `json:"topic,omitempty"`
Condition string `json:"condition,omitempty"`
// End of list of possible types for union field target.
Data map[string]string `json:"data,omitempty"`
Notification *Notification `json:"notification,omitempty"`
FcmOptions *FcmOptions `json:"fcm_options,omitempty"`
@ -13,6 +18,7 @@ type Message struct {
}
// Basic notification template to use across all platforms.
// https://firebase.google.com/docs/reference/fcm/rest/v1/projects.messages#notification
type Notification struct {
Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"`
@ -170,8 +176,8 @@ type ApnsPayload struct {
Aps ApnsPayloadKeys `json:"aps"`
// TODO: add support for custom keys
// https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/generating_a_remote_notification
customKeys map[string]interface{}
// https://developer.apple.com/documentation/usernotifications/generating-a-remote-notification
customKeys map[string]any
}
type ApnsPayloadKeys struct {