blob: 947f03fda5a8bb0b10b51a865e18ea86d29562f2 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package internal
import (
"io"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security"
"messenger/ifc"
)
// messenger implements the Messenger and MessageRepository interfaces.
type Messenger struct {
Params Params
Notifier *PubSub
Counters *Counters
}
func (m *Messenger) Diff(ctx *context.T, call ifc.MessengerDiffServerCall) error {
ctx.Infof("Diff() called")
if err := m.Params.RateAclIn.Authorize(ctx, call.Security()); err != nil {
return err
}
for call.RecvStream().Advance() {
ids := call.RecvStream().Value()
responses := make([]bool, len(ids))
for i, id := range ids {
responses[i] = m.Params.Store.Exists(ctx, id)
}
if err := call.SendStream().Send(responses); err != nil {
return err
}
}
return nil
}
func (m *Messenger) Push(ctx *context.T, call ifc.MessengerPushServerCall, msg ifc.Message, offset int64) error {
b, r := security.RemoteBlessingNames(ctx, call.Security())
var cp security.CallParams
cp.Copy(call.Security())
cp.RemoteBlessings = msg.SenderBlessings
cp.RemoteDischarges = msg.SenderDischarges
senderCall := security.NewCall(&cp)
sb, sr := security.RemoteBlessingNames(ctx, senderCall)
ctx.Infof("Push() Caller Blessing Names %q, rejected %q.", b, r)
ctx.Infof(" Sender Blessing Names %q, rejected %q.", sb, sr)
if err := m.Params.RateAclIn.Authorize(ctx, call.Security()); err != nil {
return err
}
if err := m.Params.RateAclSender.Authorize(ctx, senderCall); err != nil {
return err
}
if err := msg.Validate(ctx); err != nil {
return err
}
if m.Params.MaxHops > 0 && len(msg.Hops) > m.Params.MaxHops {
return ifc.NewErrTooManyHops(ctx, int32(m.Params.MaxHops))
}
if m.Params.MaxMessageLength > 0 && msg.Length > m.Params.MaxMessageLength {
return ifc.NewErrTooBig(ctx, m.Params.MaxMessageLength)
}
w, err := m.Params.Store.OpenWrite(ctx, msg, offset)
if err != nil {
return err
}
size := int64(0)
for call.RecvStream().Advance() {
packet := call.RecvStream().Value()
if _, err := w.Write(packet); err != nil {
w.Close()
return err
}
size += int64(len(packet))
m.Counters.numBytesReceived.Incr(int64(len(packet)))
}
if err := call.RecvStream().Err(); err != nil && size != msg.Length {
// The stream was interrupted. We should keep the data that was
// transferred to allow the transfer to be resumed later.
w.Close()
return err
}
if err := w.Close(); err != nil {
return err
}
m.Counters.numMessagesReceived.Incr(1)
if m.Notifier != nil {
m.Notifier.Pub(msg)
}
return nil
}
func (m *Messenger) ResumeOffset(ctx *context.T, call rpc.ServerCall, msg ifc.Message) (offset int64, err error) {
ctx.Infof("ResumeOffset() called")
if err := m.Params.RateAclIn.Authorize(ctx, call.Security()); err != nil {
return 0, err
}
return m.Params.Store.Offset(ctx, msg.Id)
}
func (m *Messenger) Manifest(ctx *context.T, call ifc.MessageRepositoryManifestServerCall) error {
ctx.Infof("Manifest() called")
if err := m.Params.RateAclIn.Authorize(ctx, call.Security()); err != nil {
return err
}
ch, err := m.Params.Store.Manifest(ctx)
if err != nil {
return err
}
for msg := range ch {
call.SendStream().Send(*msg)
}
return nil
}
func (m *Messenger) Pull(ctx *context.T, call ifc.MessageRepositoryPullServerCall, id string, offset int64) (ifc.Message, error) {
ctx.Infof("Pull() called")
if err := m.Params.RateAclIn.Authorize(ctx, call.Security()); err != nil {
return ifc.Message{}, err
}
msg, r, err := m.Params.Store.OpenRead(ctx, id)
if err != nil {
return ifc.Message{}, err
}
if _, err := r.Seek(offset, 0); err != nil {
return ifc.Message{}, err
}
buf := make([]byte, 2048)
for {
n, err := r.Read(buf)
if n > 0 {
if err := call.SendStream().Send(buf[:n]); err != nil {
return ifc.Message{}, err
}
}
if err == io.EOF {
break
}
if err != nil {
return ifc.Message{}, err
}
}
return msg, nil
}