matrix-prometheus/vendor/maunium.net/go/mautrix/appservice/message_send_checkpoint.go

166 lines
5.2 KiB
Go
Raw Normal View History

2023-01-13 21:24:39 +01:00
// Copyright (c) 2021 Sumner Evans
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package appservice
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
type MessageSendCheckpointStep string
const (
StepClient MessageSendCheckpointStep = "CLIENT"
StepHomeserver MessageSendCheckpointStep = "HOMESERVER"
StepBridge MessageSendCheckpointStep = "BRIDGE"
StepDecrypted MessageSendCheckpointStep = "DECRYPTED"
StepRemote MessageSendCheckpointStep = "REMOTE"
StepCommand MessageSendCheckpointStep = "COMMAND"
)
type MessageSendCheckpointStatus string
const (
StatusSuccesss MessageSendCheckpointStatus = "SUCCESS"
StatusWillRetry MessageSendCheckpointStatus = "WILL_RETRY"
StatusPermFailure MessageSendCheckpointStatus = "PERM_FAILURE"
StatusUnsupported MessageSendCheckpointStatus = "UNSUPPORTED"
StatusTimeout MessageSendCheckpointStatus = "TIMEOUT"
)
type MessageSendCheckpointReportedBy string
const (
ReportedByAsmux MessageSendCheckpointReportedBy = "ASMUX"
ReportedByBridge MessageSendCheckpointReportedBy = "BRIDGE"
)
type MessageSendCheckpoint struct {
EventID id.EventID `json:"event_id"`
RoomID id.RoomID `json:"room_id"`
Step MessageSendCheckpointStep `json:"step"`
Timestamp int64 `json:"timestamp"`
Status MessageSendCheckpointStatus `json:"status"`
EventType event.Type `json:"event_type"`
ReportedBy MessageSendCheckpointReportedBy `json:"reported_by"`
RetryNum int `json:"retry_num"`
MessageType event.MessageType `json:"message_type,omitempty"`
Info string `json:"info,omitempty"`
}
var CheckpointTypes = map[event.Type]struct{}{
event.EventRedaction: {},
event.EventMessage: {},
event.EventEncrypted: {},
event.EventSticker: {},
event.EventReaction: {},
event.CallInvite: {},
event.CallCandidates: {},
event.CallSelectAnswer: {},
event.CallAnswer: {},
event.CallHangup: {},
event.CallReject: {},
event.CallNegotiate: {},
}
func NewMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, status MessageSendCheckpointStatus, retryNum int) *MessageSendCheckpoint {
checkpoint := MessageSendCheckpoint{
EventID: evt.ID,
RoomID: evt.RoomID,
Step: step,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Status: status,
EventType: evt.Type,
ReportedBy: ReportedByBridge,
RetryNum: retryNum,
}
if evt.Type == event.EventMessage {
checkpoint.MessageType = evt.Content.AsMessage().MsgType
}
return &checkpoint
}
func (as *AppService) SendMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, retryNum int) {
checkpoint := NewMessageSendCheckpoint(evt, step, StatusSuccesss, retryNum)
go checkpoint.Send(as)
}
func (as *AppService) SendErrorMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, err error, permanent bool, retryNum int) {
status := StatusWillRetry
if permanent {
status = StatusPermFailure
}
checkpoint := NewMessageSendCheckpoint(evt, step, status, retryNum)
checkpoint.Info = err.Error()
go checkpoint.Send(as)
}
func (cp *MessageSendCheckpoint) Send(as *AppService) {
err := SendCheckpoints(as, []*MessageSendCheckpoint{cp})
if err != nil {
as.Log.Warnfln("Error sending checkpoint %s/%s for %s: %v", cp.Step, cp.Status, cp.EventID, err)
}
}
type CheckpointsJSON struct {
Checkpoints []*MessageSendCheckpoint `json:"checkpoints"`
}
func SendCheckpoints(as *AppService, checkpoints []*MessageSendCheckpoint) error {
checkpointsJSON := CheckpointsJSON{Checkpoints: checkpoints}
if as.HasWebsocket() {
return as.SendWebsocket(&WebsocketRequest{
Command: "message_checkpoint",
Data: checkpointsJSON,
})
}
if as.MessageSendCheckpointEndpoint == "" {
return nil
}
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(checkpointsJSON); err != nil {
return fmt.Errorf("failed to encode message send checkpoint JSON: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, as.MessageSendCheckpointEndpoint, &body)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+as.Registration.AppToken)
req.Header.Set("User-Agent", mautrix.DefaultUserAgent+" checkpoint sender")
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed to send bridge state update: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
respBody, _ := ioutil.ReadAll(resp.Body)
if respBody != nil {
respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
}
return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody)
}
return nil
}