166 lines
5.2 KiB
Go
166 lines
5.2 KiB
Go
// Copyright (c) 2021 Sumner Evans
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package appservice
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"time"
|
|
|
|
"maunium.net/go/mautrix"
|
|
"maunium.net/go/mautrix/event"
|
|
"maunium.net/go/mautrix/id"
|
|
)
|
|
|
|
type MessageSendCheckpointStep string
|
|
|
|
const (
|
|
StepClient MessageSendCheckpointStep = "CLIENT"
|
|
StepHomeserver MessageSendCheckpointStep = "HOMESERVER"
|
|
StepBridge MessageSendCheckpointStep = "BRIDGE"
|
|
StepDecrypted MessageSendCheckpointStep = "DECRYPTED"
|
|
StepRemote MessageSendCheckpointStep = "REMOTE"
|
|
StepCommand MessageSendCheckpointStep = "COMMAND"
|
|
)
|
|
|
|
type MessageSendCheckpointStatus string
|
|
|
|
const (
|
|
StatusSuccesss MessageSendCheckpointStatus = "SUCCESS"
|
|
StatusWillRetry MessageSendCheckpointStatus = "WILL_RETRY"
|
|
StatusPermFailure MessageSendCheckpointStatus = "PERM_FAILURE"
|
|
StatusUnsupported MessageSendCheckpointStatus = "UNSUPPORTED"
|
|
StatusTimeout MessageSendCheckpointStatus = "TIMEOUT"
|
|
)
|
|
|
|
type MessageSendCheckpointReportedBy string
|
|
|
|
const (
|
|
ReportedByAsmux MessageSendCheckpointReportedBy = "ASMUX"
|
|
ReportedByBridge MessageSendCheckpointReportedBy = "BRIDGE"
|
|
)
|
|
|
|
type MessageSendCheckpoint struct {
|
|
EventID id.EventID `json:"event_id"`
|
|
RoomID id.RoomID `json:"room_id"`
|
|
Step MessageSendCheckpointStep `json:"step"`
|
|
Timestamp int64 `json:"timestamp"`
|
|
Status MessageSendCheckpointStatus `json:"status"`
|
|
EventType event.Type `json:"event_type"`
|
|
ReportedBy MessageSendCheckpointReportedBy `json:"reported_by"`
|
|
RetryNum int `json:"retry_num"`
|
|
MessageType event.MessageType `json:"message_type,omitempty"`
|
|
Info string `json:"info,omitempty"`
|
|
}
|
|
|
|
var CheckpointTypes = map[event.Type]struct{}{
|
|
event.EventRedaction: {},
|
|
event.EventMessage: {},
|
|
event.EventEncrypted: {},
|
|
event.EventSticker: {},
|
|
event.EventReaction: {},
|
|
event.CallInvite: {},
|
|
event.CallCandidates: {},
|
|
event.CallSelectAnswer: {},
|
|
event.CallAnswer: {},
|
|
event.CallHangup: {},
|
|
event.CallReject: {},
|
|
event.CallNegotiate: {},
|
|
}
|
|
|
|
func NewMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, status MessageSendCheckpointStatus, retryNum int) *MessageSendCheckpoint {
|
|
checkpoint := MessageSendCheckpoint{
|
|
EventID: evt.ID,
|
|
RoomID: evt.RoomID,
|
|
Step: step,
|
|
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
|
|
Status: status,
|
|
EventType: evt.Type,
|
|
ReportedBy: ReportedByBridge,
|
|
RetryNum: retryNum,
|
|
}
|
|
if evt.Type == event.EventMessage {
|
|
checkpoint.MessageType = evt.Content.AsMessage().MsgType
|
|
}
|
|
return &checkpoint
|
|
}
|
|
|
|
func (as *AppService) SendMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, retryNum int) {
|
|
checkpoint := NewMessageSendCheckpoint(evt, step, StatusSuccesss, retryNum)
|
|
go checkpoint.Send(as)
|
|
}
|
|
|
|
func (as *AppService) SendErrorMessageSendCheckpoint(evt *event.Event, step MessageSendCheckpointStep, err error, permanent bool, retryNum int) {
|
|
status := StatusWillRetry
|
|
if permanent {
|
|
status = StatusPermFailure
|
|
}
|
|
checkpoint := NewMessageSendCheckpoint(evt, step, status, retryNum)
|
|
checkpoint.Info = err.Error()
|
|
go checkpoint.Send(as)
|
|
}
|
|
|
|
func (cp *MessageSendCheckpoint) Send(as *AppService) {
|
|
err := SendCheckpoints(as, []*MessageSendCheckpoint{cp})
|
|
if err != nil {
|
|
as.Log.Warnfln("Error sending checkpoint %s/%s for %s: %v", cp.Step, cp.Status, cp.EventID, err)
|
|
}
|
|
}
|
|
|
|
type CheckpointsJSON struct {
|
|
Checkpoints []*MessageSendCheckpoint `json:"checkpoints"`
|
|
}
|
|
|
|
func SendCheckpoints(as *AppService, checkpoints []*MessageSendCheckpoint) error {
|
|
checkpointsJSON := CheckpointsJSON{Checkpoints: checkpoints}
|
|
|
|
if as.HasWebsocket() {
|
|
return as.SendWebsocket(&WebsocketRequest{
|
|
Command: "message_checkpoint",
|
|
Data: checkpointsJSON,
|
|
})
|
|
}
|
|
|
|
if as.MessageSendCheckpointEndpoint == "" {
|
|
return nil
|
|
}
|
|
|
|
var body bytes.Buffer
|
|
if err := json.NewEncoder(&body).Encode(checkpointsJSON); err != nil {
|
|
return fmt.Errorf("failed to encode message send checkpoint JSON: %w", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, as.MessageSendCheckpointEndpoint, &body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.Header.Set("Authorization", "Bearer "+as.Registration.AppToken)
|
|
req.Header.Set("User-Agent", mautrix.DefaultUserAgent+" checkpoint sender")
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send bridge state update: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode < 200 || resp.StatusCode > 299 {
|
|
respBody, _ := ioutil.ReadAll(resp.Body)
|
|
if respBody != nil {
|
|
respBody = bytes.ReplaceAll(respBody, []byte("\n"), []byte("\\n"))
|
|
}
|
|
return fmt.Errorf("unexpected status code %d sending bridge state update: %s", resp.StatusCode, respBody)
|
|
}
|
|
return nil
|
|
}
|