few path changes and added fix from master
This commit is contained in:
92
app/deliver/deliver.go
Normal file
92
app/deliver/deliver.go
Normal file
@ -0,0 +1,92 @@
|
||||
package deliver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/RichardKnop/machinery/v1"
|
||||
"github.com/RichardKnop/machinery/v1/log"
|
||||
"github.com/go-redis/redis"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/yukimochi/Activity-Relay/models"
|
||||
)
|
||||
|
||||
var (
|
||||
version string
|
||||
globalConfig *models.RelayConfig
|
||||
|
||||
// Actor : Relay's Actor
|
||||
Actor models.Actor
|
||||
|
||||
redisClient *redis.Client
|
||||
machineryServer *machinery.Server
|
||||
httpClient *http.Client
|
||||
)
|
||||
|
||||
func relayActivity(args ...string) error {
|
||||
inboxURL := args[0]
|
||||
body := args[1]
|
||||
err := sendActivity(inboxURL, Actor.ID, []byte(body), globalConfig.ActorKey())
|
||||
if err != nil {
|
||||
domain, _ := url.Parse(inboxURL)
|
||||
evalScript := "local change = redis.call('HSETNX',KEYS[1], 'last_error', ARGV[1]); if change == 1 then redis.call('EXPIRE', KEYS[1], ARGV[2]) end;"
|
||||
redisClient.Eval(evalScript, []string{"relay:statistics:" + domain.Host}, err.Error(), 60).Result()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func registerActivity(args ...string) error {
|
||||
inboxURL := args[0]
|
||||
body := args[1]
|
||||
err := sendActivity(inboxURL, Actor.ID, []byte(body), globalConfig.ActorKey())
|
||||
return err
|
||||
}
|
||||
|
||||
func Entrypoint(g *models.RelayConfig, v string) error {
|
||||
var err error
|
||||
globalConfig = g
|
||||
version = v
|
||||
|
||||
err = initialize(globalConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = machineryServer.RegisterTask("register", registerActivity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = machineryServer.RegisterTask("relay", relayActivity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
workerID := uuid.NewV4()
|
||||
worker := machineryServer.NewWorker(workerID.String(), globalConfig.JobConcurrency())
|
||||
err = worker.Launch()
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func initialize(globalConfig *models.RelayConfig) error {
|
||||
var err error
|
||||
|
||||
redisClient = globalConfig.RedisClient()
|
||||
|
||||
machineryServer, err = models.NewMachineryServer(globalConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
httpClient = &http.Client{Timeout: time.Duration(5) * time.Second}
|
||||
|
||||
Actor = models.NewActivityPubActorFromSelfKey(globalConfig)
|
||||
newNullLogger := NewNullLogger()
|
||||
log.DEBUG = newNullLogger
|
||||
|
||||
return nil
|
||||
}
|
140
app/deliver/deliver_test.go
Normal file
140
app/deliver/deliver_test.go
Normal file
@ -0,0 +1,140 @@
|
||||
package deliver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
"github.com/yukimochi/Activity-Relay/models"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
var err error
|
||||
|
||||
testConfigPath := "../misc/test/config.yml"
|
||||
file, _ := os.Open(testConfigPath)
|
||||
defer file.Close()
|
||||
|
||||
viper.SetConfigType("yaml")
|
||||
viper.ReadConfig(file)
|
||||
viper.Set("ACTOR_PEM", "../misc/test/testKey.pem")
|
||||
viper.BindEnv("REDIS_URL")
|
||||
|
||||
globalConfig, err = models.NewRelayConfig()
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
err = initialize(globalConfig)
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
redisClient.FlushAll().Result()
|
||||
code := m.Run()
|
||||
os.Exit(code)
|
||||
}
|
||||
|
||||
func TestRelayActivity(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
data, _ := ioutil.ReadAll(r.Body)
|
||||
if string(data) != "data" || r.Header.Get("Content-Type") != "application/activity+json" {
|
||||
w.WriteHeader(500)
|
||||
w.Write(nil)
|
||||
} else {
|
||||
w.WriteHeader(202)
|
||||
w.Write(nil)
|
||||
}
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
err := relayActivity(s.URL, "data")
|
||||
if err != nil {
|
||||
t.Fatal("Failed - Data transfer not collect")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRelayActivityNoHost(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
err := relayActivity("http://nohost.example.jp", "data")
|
||||
if err == nil {
|
||||
t.Fatal("Failed - Error not reported.")
|
||||
}
|
||||
domain, _ := url.Parse("http://nohost.example.jp")
|
||||
data, _ := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result()
|
||||
if data == "" {
|
||||
t.Fatal("Failed - Error not cached.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRelayActivityResp500(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
w.Write(nil)
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
err := relayActivity(s.URL, "data")
|
||||
if err == nil {
|
||||
t.Fatal("Failed - Error not reported.")
|
||||
}
|
||||
domain, _ := url.Parse(s.URL)
|
||||
data, _ := redisClient.HGet("relay:statistics:"+domain.Host, "last_error").Result()
|
||||
if data == "" {
|
||||
t.Fatal("Failed - Error not cached.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterActivity(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
data, _ := ioutil.ReadAll(r.Body)
|
||||
if string(data) != "data" || r.Header.Get("Content-Type") != "application/activity+json" {
|
||||
w.WriteHeader(500)
|
||||
w.Write(nil)
|
||||
} else {
|
||||
w.WriteHeader(202)
|
||||
w.Write(nil)
|
||||
}
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
err := registerActivity(s.URL, "data")
|
||||
if err != nil {
|
||||
t.Fatal("Failed - Data transfer not collect")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterActivityNoHost(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
err := registerActivity("http://nohost.example.jp", "data")
|
||||
if err == nil {
|
||||
t.Fatal("Failed - Error not reported.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegisterActivityResp500(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(500)
|
||||
w.Write(nil)
|
||||
}))
|
||||
defer s.Close()
|
||||
|
||||
err := registerActivity(s.URL, "data")
|
||||
if err == nil {
|
||||
t.Fatal("Failed - Error not reported.")
|
||||
}
|
||||
}
|
30
app/deliver/logger.go
Normal file
30
app/deliver/logger.go
Normal file
@ -0,0 +1,30 @@
|
||||
package deliver
|
||||
|
||||
// NullLogger : Null logger for debug output
|
||||
type NullLogger struct {
|
||||
}
|
||||
|
||||
// NewNullLogger : Create NullLogger
|
||||
func NewNullLogger() *NullLogger {
|
||||
var newNullLogger NullLogger
|
||||
return &newNullLogger
|
||||
}
|
||||
|
||||
func (l *NullLogger) Print(v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Printf(format string, v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Println(v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Fatal(v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Fatalf(format string, v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Fatalln(v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Panic(v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Panicf(format string, v ...interface{}) {
|
||||
}
|
||||
func (l *NullLogger) Panicln(v ...interface{}) {
|
||||
}
|
53
app/deliver/sender.go
Normal file
53
app/deliver/sender.go
Normal file
@ -0,0 +1,53 @@
|
||||
package deliver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
"strings"
|
||||
|
||||
httpdate "github.com/Songmu/go-httpdate"
|
||||
"github.com/go-fed/httpsig"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// See https://github.com/mastodon/mastodon/pull/14556
|
||||
const ONEHOUR = 60 * 60
|
||||
|
||||
func appendSignature(request *http.Request, body *[]byte, KeyID string, privateKey *rsa.PrivateKey) error {
|
||||
request.Header.Set("Host", request.Host)
|
||||
request.Header.Set("(request-target)", fmt.Sprintf("%s %s", strings.ToLower(request.Method), request.URL.Path))
|
||||
|
||||
signer, _, err := httpsig.NewSigner([]httpsig.Algorithm{httpsig.RSA_SHA256}, httpsig.DigestSha256, []string{httpsig.RequestTarget, "Host", "Date", "Digest", "Content-Type"}, httpsig.Signature, ONEHOUR)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = signer.SignRequest(privateKey, KeyID, request, *body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func sendActivity(inboxURL string, KeyID string, body []byte, privateKey *rsa.PrivateKey) error {
|
||||
req, _ := http.NewRequest("POST", inboxURL, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/activity+json")
|
||||
req.Header.Set("User-Agent", fmt.Sprintf("%s (golang net/http; Activity-Relay %s; %s)", globalConfig.ServerServiceName(), version, globalConfig.ServerHostname().Host))
|
||||
req.Header.Set("Date", httpdate.Time2Str(time.Now()))
|
||||
appendSignature(req, &body, KeyID, privateKey)
|
||||
resp, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
logrus.Debug(inboxURL, " ", resp.StatusCode)
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return errors.New("Post " + inboxURL + ": " + resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
58
app/deliver/sender_test.go
Normal file
58
app/deliver/sender_test.go
Normal file
@ -0,0 +1,58 @@
|
||||
package deliver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rsa"
|
||||
"crypto/sha256"
|
||||
"crypto/x509"
|
||||
"encoding/base64"
|
||||
"encoding/pem"
|
||||
"github.com/Songmu/go-httpdate"
|
||||
"github.com/go-fed/httpsig"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func generatePublicKeyPEMString(publicKey *rsa.PublicKey) string {
|
||||
publicKeyByte := x509.MarshalPKCS1PublicKey(publicKey)
|
||||
publicKeyPem := pem.EncodeToMemory(
|
||||
&pem.Block{
|
||||
Type: "RSA PUBLIC KEY",
|
||||
Bytes: publicKeyByte,
|
||||
},
|
||||
)
|
||||
return string(publicKeyPem)
|
||||
}
|
||||
|
||||
func TestAppendSignature(t *testing.T) {
|
||||
file, _ := os.Open("../misc/test/create.json")
|
||||
body, _ := ioutil.ReadAll(file)
|
||||
req, _ := http.NewRequest("POST", "https://localhost", bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/activity+json")
|
||||
req.Header.Set("Date", httpdate.Time2Str(time.Now()))
|
||||
appendSignature(req, &body, "https://innocent.yukimochi.io/users/YUKIMOCHI#main-key", globalConfig.ActorKey())
|
||||
|
||||
// Verify HTTPSignature
|
||||
verifier, err := httpsig.NewVerifier(req)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed - " + err.Error())
|
||||
}
|
||||
err = verifier.Verify(globalConfig.ActorKey().Public(), httpsig.RSA_SHA256)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed - " + err.Error())
|
||||
}
|
||||
|
||||
// Verify Digest
|
||||
givenDigest := req.Header.Get("Digest")
|
||||
hash := sha256.New()
|
||||
hash.Write(body)
|
||||
b := hash.Sum(nil)
|
||||
calculatedDigest := "SHA-256=" + base64.StdEncoding.EncodeToString(b)
|
||||
|
||||
if givenDigest != calculatedDigest {
|
||||
t.Fatalf("Failed - " + err.Error())
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user