diff options
60 files changed, 3096 insertions, 492 deletions
diff --git a/Gopkg.lock b/Gopkg.lock index 086711ddc..c46e2bfa1 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -16,19 +16,20 @@ [[projects]] name = "github.com/Shopify/sarama" packages = ["."] - revision = "bbdbe644099b7fdc8327d5cc69c030945188b2e9" - version = "v1.13.0" + revision = "240fd146ce68bcafb034cc5dc977229ffbafa8ea" + version = "v1.14.0" [[projects]] + branch = "master" name = "github.com/apache/thrift" packages = ["lib/go/thrift"] - revision = "4f77ab8e296d64c57e6ea1c6e3f0f152bc7d6a3a" + revision = "95d5fb3a1e38125b9eabcbe9cda1a6c7bbe3e93d" [[projects]] name = "github.com/asaskevich/govalidator" packages = ["."] - revision = "73945b6115bfbbcc57d89b7316e28109364124e1" - version = "v7" + revision = "521b25f4b05fd26bec69d9dedeb8f9c9a83939a8" + version = "v8" [[projects]] branch = "master" @@ -160,7 +161,7 @@ branch = "master" name = "github.com/go-openapi/swag" packages = ["."] - revision = "f3f9494671f93fcff853e3c6e9e948b3eb71e590" + revision = "cf0bdb963811675a4d7e74901cefc7411a1df939" [[projects]] name = "github.com/gogo/protobuf" @@ -262,6 +263,7 @@ name = "github.com/openzipkin/zipkin-go-opentracing" packages = [".","flag","thrift/gen-go/scribe","thrift/gen-go/zipkincore","types","wire"] revision = "45e90b00710a4c34a1a7d8a78d90f9b010b0bd4d" + version = "v0.3.2" [[projects]] name = "github.com/pierrec/lz4" @@ -326,7 +328,7 @@ branch = "master" name = "golang.org/x/sys" packages = ["unix","windows"] - revision = "75813c647272dd855bda156405bf844a5414f5bf" + revision = "1e2299c37cc91a509f1b12369872d27be0ce98a6" [[projects]] branch = "master" @@ -378,6 +380,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "c7279ef091bb11a42d1421f51e53d761113ea23d9e9b993823605883da0f80ff" + inputs-digest = "be9300a30414c93aa44756868a7906a0a295b0910a662880741bcfac58b7b679" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 3a9d9a644..5f6fc8b54 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -13,12 +13,9 @@ ignored = [ "golang.org/x/net/trace", ] -[[constraint]] - name = "github.com/openzipkin/zipkin-go-opentracing" - revision = "45e90b00710a4c34a1a7d8a78d90f9b010b0bd4d" [[override]] name = "github.com/apache/thrift" - revision = "4f77ab8e296d64c57e6ea1c6e3f0f152bc7d6a3a" + branch = "master" [[override]] name = "github.com/ugorji/go" diff --git a/vendor/github.com/Shopify/sarama/.gitignore b/vendor/github.com/Shopify/sarama/.gitignore index 3591f9ff3..c6c482dca 100644 --- a/vendor/github.com/Shopify/sarama/.gitignore +++ b/vendor/github.com/Shopify/sarama/.gitignore @@ -22,3 +22,5 @@ _cgo_export.* _testmain.go *.exe + +coverage.txt diff --git a/vendor/github.com/Shopify/sarama/.travis.yml b/vendor/github.com/Shopify/sarama/.travis.yml index 8e5a91ab0..7a32900cc 100644 --- a/vendor/github.com/Shopify/sarama/.travis.yml +++ b/vendor/github.com/Shopify/sarama/.travis.yml @@ -31,4 +31,7 @@ script: - make errcheck - make fmt +after_success: + - bash <(curl -s https://codecov.io/bash) + sudo: false diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md index 5f65cb8c8..028a18033 100644 --- a/vendor/github.com/Shopify/sarama/CHANGELOG.md +++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +#### Version 1.14.0 (2017-11-13) + +New Features: + - Add support for the new Kafka 0.11 record-batch format, including the wire + protocol and the necessary behavioural changes in the producer and consumer. + Transactions and idempotency are not yet supported, but producing and + consuming should work with all the existing bells and whistles (batching, + compression, etc) as well as the new custom headers. Thanks to Vlad Hanciuta + of Arista Networks for this work. Part of + ([#901](https://github.com/Shopify/sarama/issues/901)). + +Bug Fixes: + - Fix encoding of ProduceResponse versions in test + ([#970](https://github.com/Shopify/sarama/pull/970)). + - Return partial replicas list when we have it + ([#975](https://github.com/Shopify/sarama/pull/975)). + #### Version 1.13.0 (2017-10-04) New Features: diff --git a/vendor/github.com/Shopify/sarama/Makefile b/vendor/github.com/Shopify/sarama/Makefile index 626b09a54..58a39e4f3 100644 --- a/vendor/github.com/Shopify/sarama/Makefile +++ b/vendor/github.com/Shopify/sarama/Makefile @@ -1,7 +1,15 @@ default: fmt vet errcheck test +# Taken from https://github.com/codecov/example-go#caveat-multiple-files test: - go test -v -timeout 60s -race ./... + echo "" > coverage.txt + for d in `go list ./... | grep -v vendor`; do \ + go test -v -timeout 60s -race -coverprofile=profile.out -covermode=atomic $$d; \ + if [ -f profile.out ]; then \ + cat profile.out >> coverage.txt; \ + rm profile.out; \ + fi \ + done vet: go vet ./... diff --git a/vendor/github.com/Shopify/sarama/README.md b/vendor/github.com/Shopify/sarama/README.md index 47a9bda52..f52af5729 100644 --- a/vendor/github.com/Shopify/sarama/README.md +++ b/vendor/github.com/Shopify/sarama/README.md @@ -3,6 +3,7 @@ sarama [](https://godoc.org/github.com/Shopify/sarama) [](https://travis-ci.org/Shopify/sarama) +[](https://codecov.io/gh/Shopify/sarama) Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apache.org/) version 0.8 (and later). diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go index 6d71a6d8f..1eff81cbf 100644 --- a/vendor/github.com/Shopify/sarama/async_producer.go +++ b/vendor/github.com/Shopify/sarama/async_producer.go @@ -1,6 +1,7 @@ package sarama import ( + "encoding/binary" "fmt" "sync" "time" @@ -119,6 +120,10 @@ type ProducerMessage struct { // StringEncoder and ByteEncoder. Value Encoder + // The headers are key-value pairs that are transparently passed + // by Kafka between producers and consumers. + Headers []RecordHeader + // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for @@ -146,8 +151,16 @@ type ProducerMessage struct { const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. -func (m *ProducerMessage) byteSize() int { - size := producerMessageOverhead +func (m *ProducerMessage) byteSize(version int) int { + var size int + if version >= 2 { + size = maximumRecordOverhead + for _, h := range m.Headers { + size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 + } + } else { + size = producerMessageOverhead + } if m.Key != nil { size += m.Key.Length() } @@ -254,7 +267,11 @@ func (p *asyncProducer) dispatcher() { p.inFlight.Add(1) } - if msg.byteSize() > p.conf.Producer.MaxMessageBytes { + version := 1 + if p.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes { p.returnError(msg, ErrMessageSizeTooLarge) continue } diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go index 570f7f3f3..3dbfc4b06 100644 --- a/vendor/github.com/Shopify/sarama/client.go +++ b/vendor/github.com/Shopify/sarama/client.go @@ -49,9 +49,9 @@ type Client interface { RefreshMetadata(topics ...string) error // GetOffset queries the cluster to get the most recent available offset at the - // given time on the topic/partition combination. Time should be OffsetOldest for - // the earliest available offset, OffsetNewest for the offset of the message that - // will be produced next, or a time. + // given time (in milliseconds) on the topic/partition combination. + // Time should be OffsetOldest for the earliest available offset, + // OffsetNewest for the offset of the message that will be produced next, or a time. GetOffset(topic string, partitionID int32, time int64) (int64, error) // Coordinator returns the coordinating broker for a consumer group. It will @@ -297,7 +297,7 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) } if metadata.Err == ErrReplicaNotAvailable { - return nil, metadata.Err + return dupInt32Slice(metadata.Replicas), metadata.Err } return dupInt32Slice(metadata.Replicas), nil } @@ -322,7 +322,7 @@ func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, } if metadata.Err == ErrReplicaNotAvailable { - return nil, metadata.Err + return dupInt32Slice(metadata.Isr), metadata.Err } return dupInt32Slice(metadata.Isr), nil } diff --git a/vendor/github.com/Shopify/sarama/config_test.go b/vendor/github.com/Shopify/sarama/config_test.go index 5fef6b361..40aa453a9 100644 --- a/vendor/github.com/Shopify/sarama/config_test.go +++ b/vendor/github.com/Shopify/sarama/config_test.go @@ -33,6 +33,169 @@ func TestEmptyClientIDConfigValidates(t *testing.T) { } } +func TestNetConfigValidates(t *testing.T) { + tests := []struct { + name string + cfg func(*Config) // resorting to using a function as a param because of internal composite structs + err string + }{ + { + "OpenRequests", + func(cfg *Config) { + cfg.Net.MaxOpenRequests = 0 + }, + "Net.MaxOpenRequests must be > 0"}, + {"DialTimeout", + func(cfg *Config) { + cfg.Net.DialTimeout = 0 + }, + "Net.DialTimeout must be > 0"}, + {"ReadTimeout", + func(cfg *Config) { + cfg.Net.ReadTimeout = 0 + }, + "Net.ReadTimeout must be > 0"}, + {"WriteTimeout", + func(cfg *Config) { + cfg.Net.WriteTimeout = 0 + }, + "Net.WriteTimeout must be > 0"}, + {"KeepAlive", + func(cfg *Config) { + cfg.Net.KeepAlive = -1 + }, + "Net.KeepAlive must be >= 0"}, + {"SASL.User", + func(cfg *Config) { + cfg.Net.SASL.Enable = true + cfg.Net.SASL.User = "" + }, + "Net.SASL.User must not be empty when SASL is enabled"}, + {"SASL.Password", + func(cfg *Config) { + cfg.Net.SASL.Enable = true + cfg.Net.SASL.User = "user" + cfg.Net.SASL.Password = "" + }, + "Net.SASL.Password must not be empty when SASL is enabled"}, + } + + for i, test := range tests { + c := NewConfig() + test.cfg(c) + if err := c.Validate(); string(err.(ConfigurationError)) != test.err { + t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) + } + } +} + +func TestMetadataConfigValidates(t *testing.T) { + tests := []struct { + name string + cfg func(*Config) // resorting to using a function as a param because of internal composite structs + err string + }{ + { + "Retry.Max", + func(cfg *Config) { + cfg.Metadata.Retry.Max = -1 + }, + "Metadata.Retry.Max must be >= 0"}, + {"Retry.Backoff", + func(cfg *Config) { + cfg.Metadata.Retry.Backoff = -1 + }, + "Metadata.Retry.Backoff must be >= 0"}, + {"RefreshFrequency", + func(cfg *Config) { + cfg.Metadata.RefreshFrequency = -1 + }, + "Metadata.RefreshFrequency must be >= 0"}, + } + + for i, test := range tests { + c := NewConfig() + test.cfg(c) + if err := c.Validate(); string(err.(ConfigurationError)) != test.err { + t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) + } + } +} + +func TestProducerConfigValidates(t *testing.T) { + tests := []struct { + name string + cfg func(*Config) // resorting to using a function as a param because of internal composite structs + err string + }{ + { + "MaxMessageBytes", + func(cfg *Config) { + cfg.Producer.MaxMessageBytes = 0 + }, + "Producer.MaxMessageBytes must be > 0"}, + {"RequiredAcks", + func(cfg *Config) { + cfg.Producer.RequiredAcks = -2 + }, + "Producer.RequiredAcks must be >= -1"}, + {"Timeout", + func(cfg *Config) { + cfg.Producer.Timeout = 0 + }, + "Producer.Timeout must be > 0"}, + {"Partitioner", + func(cfg *Config) { + cfg.Producer.Partitioner = nil + }, + "Producer.Partitioner must not be nil"}, + {"Flush.Bytes", + func(cfg *Config) { + cfg.Producer.Flush.Bytes = -1 + }, + "Producer.Flush.Bytes must be >= 0"}, + {"Flush.Messages", + func(cfg *Config) { + cfg.Producer.Flush.Messages = -1 + }, + "Producer.Flush.Messages must be >= 0"}, + {"Flush.Frequency", + func(cfg *Config) { + cfg.Producer.Flush.Frequency = -1 + }, + "Producer.Flush.Frequency must be >= 0"}, + {"Flush.MaxMessages", + func(cfg *Config) { + cfg.Producer.Flush.MaxMessages = -1 + }, + "Producer.Flush.MaxMessages must be >= 0"}, + {"Flush.MaxMessages with Producer.Flush.Messages", + func(cfg *Config) { + cfg.Producer.Flush.MaxMessages = 1 + cfg.Producer.Flush.Messages = 2 + }, + "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"}, + {"Flush.Retry.Max", + func(cfg *Config) { + cfg.Producer.Retry.Max = -1 + }, + "Producer.Retry.Max must be >= 0"}, + {"Flush.Retry.Backoff", + func(cfg *Config) { + cfg.Producer.Retry.Backoff = -1 + }, + "Producer.Retry.Backoff must be >= 0"}, + } + + for i, test := range tests { + c := NewConfig() + test.cfg(c) + if err := c.Validate(); string(err.(ConfigurationError)) != test.err { + t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) + } + } +} + func TestLZ4ConfigValidation(t *testing.T) { config := NewConfig() config.Producer.Compression = CompressionLZ4 diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go index 2ce69b00b..cb6f031d0 100644 --- a/vendor/github.com/Shopify/sarama/consumer.go +++ b/vendor/github.com/Shopify/sarama/consumer.go @@ -14,8 +14,9 @@ type ConsumerMessage struct { Topic string Partition int32 Offset int64 - Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp - BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp + Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp + BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp + Headers []*RecordHeader // only set if kafka is version 0.11+ } // ConsumerError is what is provided to the user when an error occurs. @@ -478,44 +479,12 @@ feederLoop: close(child.errors) } -func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { - block := response.GetBlock(child.topic, child.partition) - if block == nil { - return nil, ErrIncompleteResponse - } - - if block.Err != ErrNoError { - return nil, block.Err - } - - if len(block.MsgSet.Messages) == 0 { - // We got no messages. If we got a trailing one then we need to ask for more data. - // Otherwise we just poll again and wait for one to be produced... - if block.MsgSet.PartialTrailingMessage { - if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max { - // we can't ask for more data, we've hit the configured limit - child.sendError(ErrMessageTooLarge) - child.offset++ // skip this one so we can keep processing future messages - } else { - child.fetchSize *= 2 - if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max { - child.fetchSize = child.conf.Consumer.Fetch.Max - } - } - } - - return nil, nil - } - - // we got messages, reset our fetch size in case it was increased for a previous request - child.fetchSize = child.conf.Consumer.Fetch.Default - atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) - - incomplete := false - prelude := true +func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) { var messages []*ConsumerMessage - for _, msgBlock := range block.MsgSet.Messages { + var incomplete bool + prelude := true + for _, msgBlock := range msgSet.Messages { for _, msg := range msgBlock.Messages() { offset := msg.Offset if msg.Msg.Version >= 1 { @@ -542,7 +511,46 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu incomplete = true } } + } + if incomplete || len(messages) == 0 { + return nil, ErrIncompleteResponse + } + return messages, nil +} + +func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*ConsumerMessage, error) { + var messages []*ConsumerMessage + var incomplete bool + prelude := true + batch := block.Records.recordBatch + + for _, rec := range batch.Records { + offset := batch.FirstOffset + rec.OffsetDelta + if prelude && offset < child.offset { + continue + } + prelude = false + + if offset >= child.offset { + messages = append(messages, &ConsumerMessage{ + Topic: child.topic, + Partition: child.partition, + Key: rec.Key, + Value: rec.Value, + Offset: offset, + Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta), + Headers: rec.Headers, + }) + child.offset = offset + 1 + } else { + incomplete = true + } + + if child.offset > block.LastStableOffset { + // We reached the end of closed transactions + break + } } if incomplete || len(messages) == 0 { @@ -551,6 +559,57 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu return messages, nil } +func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { + block := response.GetBlock(child.topic, child.partition) + if block == nil { + return nil, ErrIncompleteResponse + } + + if block.Err != ErrNoError { + return nil, block.Err + } + + nRecs, err := block.Records.numRecords() + if err != nil { + return nil, err + } + if nRecs == 0 { + partialTrailingMessage, err := block.Records.isPartial() + if err != nil { + return nil, err + } + // We got no messages. If we got a trailing one then we need to ask for more data. + // Otherwise we just poll again and wait for one to be produced... + if partialTrailingMessage { + if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max { + // we can't ask for more data, we've hit the configured limit + child.sendError(ErrMessageTooLarge) + child.offset++ // skip this one so we can keep processing future messages + } else { + child.fetchSize *= 2 + if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max { + child.fetchSize = child.conf.Consumer.Fetch.Max + } + } + } + + return nil, nil + } + + // we got messages, reset our fetch size in case it was increased for a previous request + child.fetchSize = child.conf.Consumer.Fetch.Default + atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) + + if control, err := block.Records.isControl(); err != nil || control { + return nil, err + } + + if response.Version < 4 { + return child.parseMessages(block.Records.msgSet) + } + return child.parseRecords(block) +} + // brokerConsumer type brokerConsumer struct { @@ -740,6 +799,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { request.Version = 3 request.MaxBytes = MaxResponseSize } + if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) { + request.Version = 4 + request.Isolation = ReadUncommitted // We don't support yet transactions. + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/vendor/github.com/Shopify/sarama/consumer_test.go b/vendor/github.com/Shopify/sarama/consumer_test.go index 48f309b6d..1526a361c 100644 --- a/vendor/github.com/Shopify/sarama/consumer_test.go +++ b/vendor/github.com/Shopify/sarama/consumer_test.go @@ -379,86 +379,118 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) { // requested, then such messages are ignored. func TestConsumerExtraOffsets(t *testing.T) { // Given - broker0 := NewMockBroker(t, 0) - fetchResponse1 := &FetchResponse{} - fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) - fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) - fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 3) - fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 4) - fetchResponse2 := &FetchResponse{} - fetchResponse2.AddError("my_topic", 0, ErrNoError) - broker0.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetBroker(broker0.Addr(), broker0.BrokerID()). - SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": NewMockOffsetResponse(t). - SetOffset("my_topic", 0, OffsetNewest, 1234). - SetOffset("my_topic", 0, OffsetOldest, 0), - "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), - }) + legacyFetchResponse := &FetchResponse{} + legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1) + legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2) + legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3) + legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4) + newFetchResponse := &FetchResponse{Version: 4} + newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1) + newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2) + newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3) + newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4) + newFetchResponse.SetLastStableOffset("my_topic", 0, 4) + for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { + var offsetResponseVersion int16 + cfg := NewConfig() + if fetchResponse1.Version >= 4 { + cfg.Version = V0_11_0_0 + offsetResponseVersion = 1 + } - master, err := NewConsumer([]string{broker0.Addr()}, nil) - if err != nil { - t.Fatal(err) - } + broker0 := NewMockBroker(t, 0) + fetchResponse2 := &FetchResponse{} + fetchResponse2.Version = fetchResponse1.Version + fetchResponse2.AddError("my_topic", 0, ErrNoError) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(offsetResponseVersion). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } - // When - consumer, err := master.ConsumePartition("my_topic", 0, 3) - if err != nil { - t.Fatal(err) - } + // When + consumer, err := master.ConsumePartition("my_topic", 0, 3) + if err != nil { + t.Fatal(err) + } - // Then: messages with offsets 1 and 2 are not returned even though they - // are present in the response. - assertMessageOffset(t, <-consumer.Messages(), 3) - assertMessageOffset(t, <-consumer.Messages(), 4) + // Then: messages with offsets 1 and 2 are not returned even though they + // are present in the response. + assertMessageOffset(t, <-consumer.Messages(), 3) + assertMessageOffset(t, <-consumer.Messages(), 4) - safeClose(t, consumer) - safeClose(t, master) - broker0.Close() + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + } } // It is fine if offsets of fetched messages are not sequential (although // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) { // Given - broker0 := NewMockBroker(t, 0) - fetchResponse1 := &FetchResponse{} - fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5) - fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7) - fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 11) - fetchResponse2 := &FetchResponse{} - fetchResponse2.AddError("my_topic", 0, ErrNoError) - broker0.SetHandlerByMap(map[string]MockResponse{ - "MetadataRequest": NewMockMetadataResponse(t). - SetBroker(broker0.Addr(), broker0.BrokerID()). - SetLeader("my_topic", 0, broker0.BrokerID()), - "OffsetRequest": NewMockOffsetResponse(t). - SetOffset("my_topic", 0, OffsetNewest, 1234). - SetOffset("my_topic", 0, OffsetOldest, 0), - "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), - }) + legacyFetchResponse := &FetchResponse{} + legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) + legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7) + legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11) + newFetchResponse := &FetchResponse{Version: 4} + newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5) + newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7) + newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11) + newFetchResponse.SetLastStableOffset("my_topic", 0, 11) + for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { + var offsetResponseVersion int16 + cfg := NewConfig() + if fetchResponse1.Version >= 4 { + cfg.Version = V0_11_0_0 + offsetResponseVersion = 1 + } - master, err := NewConsumer([]string{broker0.Addr()}, nil) - if err != nil { - t.Fatal(err) - } + broker0 := NewMockBroker(t, 0) + fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version} + fetchResponse2.AddError("my_topic", 0, ErrNoError) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(offsetResponseVersion). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } - // When - consumer, err := master.ConsumePartition("my_topic", 0, 3) - if err != nil { - t.Fatal(err) - } + // When + consumer, err := master.ConsumePartition("my_topic", 0, 3) + if err != nil { + t.Fatal(err) + } - // Then: messages with offsets 1 and 2 are not returned even though they - // are present in the response. - assertMessageOffset(t, <-consumer.Messages(), 5) - assertMessageOffset(t, <-consumer.Messages(), 7) - assertMessageOffset(t, <-consumer.Messages(), 11) + // Then: messages with offsets 1 and 2 are not returned even though they + // are present in the response. + assertMessageOffset(t, <-consumer.Messages(), 5) + assertMessageOffset(t, <-consumer.Messages(), 7) + assertMessageOffset(t, <-consumer.Messages(), 11) - safeClose(t, consumer) - safeClose(t, master) - broker0.Close() + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + } } // If leadership for a partition is changing then consumer resolves the new diff --git a/vendor/github.com/Shopify/sarama/crc32_field.go b/vendor/github.com/Shopify/sarama/crc32_field.go index e7da08c6f..1f144431a 100644 --- a/vendor/github.com/Shopify/sarama/crc32_field.go +++ b/vendor/github.com/Shopify/sarama/crc32_field.go @@ -6,9 +6,19 @@ import ( "hash/crc32" ) +type crcPolynomial int8 + +const ( + crcIEEE crcPolynomial = iota + crcCastagnoli +) + +var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) + // crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s. type crc32Field struct { startOffset int + polynomial crcPolynomial } func (c *crc32Field) saveOffset(in int) { @@ -19,14 +29,24 @@ func (c *crc32Field) reserveLength() int { return 4 } +func newCRC32Field(polynomial crcPolynomial) *crc32Field { + return &crc32Field{polynomial: polynomial} +} + func (c *crc32Field) run(curOffset int, buf []byte) error { - crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset]) + crc, err := c.crc(curOffset, buf) + if err != nil { + return err + } binary.BigEndian.PutUint32(buf[c.startOffset:], crc) return nil } func (c *crc32Field) check(curOffset int, buf []byte) error { - crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset]) + crc, err := c.crc(curOffset, buf) + if err != nil { + return err + } expected := binary.BigEndian.Uint32(buf[c.startOffset:]) if crc != expected { @@ -35,3 +55,15 @@ func (c *crc32Field) check(curOffset int, buf []byte) error { return nil } +func (c *crc32Field) crc(curOffset int, buf []byte) (uint32, error) { + var tab *crc32.Table + switch c.polynomial { + case crcIEEE: + tab = crc32.IEEETable + case crcCastagnoli: + tab = castagnoliTable + default: + return 0, PacketDecodingError{"invalid CRC type"} + } + return crc32.Checksum(buf[c.startOffset+4:curOffset], tab), nil +} diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go index 65600e86e..8c8e3a5af 100644 --- a/vendor/github.com/Shopify/sarama/fetch_request.go +++ b/vendor/github.com/Shopify/sarama/fetch_request.go @@ -29,16 +29,27 @@ type FetchRequest struct { MinBytes int32 MaxBytes int32 Version int16 + Isolation IsolationLevel blocks map[string]map[int32]*fetchRequestBlock } +type IsolationLevel int8 + +const ( + ReadUncommitted IsolationLevel = 0 + ReadCommitted IsolationLevel = 1 +) + func (r *FetchRequest) encode(pe packetEncoder) (err error) { pe.putInt32(-1) // replica ID is always -1 for clients pe.putInt32(r.MaxWaitTime) pe.putInt32(r.MinBytes) - if r.Version == 3 { + if r.Version >= 3 { pe.putInt32(r.MaxBytes) } + if r.Version >= 4 { + pe.putInt8(int8(r.Isolation)) + } err = pe.putArrayLength(len(r.blocks)) if err != nil { return err @@ -74,11 +85,18 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { if r.MinBytes, err = pd.getInt32(); err != nil { return err } - if r.Version == 3 { + if r.Version >= 3 { if r.MaxBytes, err = pd.getInt32(); err != nil { return err } } + if r.Version >= 4 { + isolation, err := pd.getInt8() + if err != nil { + return err + } + r.Isolation = IsolationLevel(isolation) + } topicCount, err := pd.getArrayLength() if err != nil { return err @@ -128,6 +146,8 @@ func (r *FetchRequest) requiredVersion() KafkaVersion { return V0_10_0_0 case 3: return V0_10_1_0 + case 4: + return V0_11_0_0 default: return minVersion } diff --git a/vendor/github.com/Shopify/sarama/fetch_request_test.go b/vendor/github.com/Shopify/sarama/fetch_request_test.go index 32c083c7d..1a94c2d1f 100644 --- a/vendor/github.com/Shopify/sarama/fetch_request_test.go +++ b/vendor/github.com/Shopify/sarama/fetch_request_test.go @@ -17,6 +17,15 @@ var ( 0x00, 0x05, 't', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56} + + fetchRequestOneBlockV4 = []byte{ + 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0xFF, + 0x01, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56} ) func TestFetchRequest(t *testing.T) { @@ -31,4 +40,9 @@ func TestFetchRequest(t *testing.T) { request.MinBytes = 0 request.AddBlock("topic", 0x12, 0x34, 0x56) testRequest(t, "one block", request, fetchRequestOneBlock) + + request.Version = 4 + request.MaxBytes = 0xFF + request.Isolation = ReadCommitted + testRequest(t, "one block v4", request, fetchRequestOneBlockV4) } diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go index b56b166c2..568d12f5e 100644 --- a/vendor/github.com/Shopify/sarama/fetch_response.go +++ b/vendor/github.com/Shopify/sarama/fetch_response.go @@ -2,13 +2,39 @@ package sarama import "time" +type AbortedTransaction struct { + ProducerID int64 + FirstOffset int64 +} + +func (t *AbortedTransaction) decode(pd packetDecoder) (err error) { + if t.ProducerID, err = pd.getInt64(); err != nil { + return err + } + + if t.FirstOffset, err = pd.getInt64(); err != nil { + return err + } + + return nil +} + +func (t *AbortedTransaction) encode(pe packetEncoder) (err error) { + pe.putInt64(t.ProducerID) + pe.putInt64(t.FirstOffset) + + return nil +} + type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 - MsgSet MessageSet + LastStableOffset int64 + AbortedTransactions []*AbortedTransaction + Records Records } -func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) { +func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err @@ -20,27 +46,75 @@ func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) { return err } - msgSetSize, err := pd.getInt32() + if version >= 4 { + b.LastStableOffset, err = pd.getInt64() + if err != nil { + return err + } + + numTransact, err := pd.getArrayLength() + if err != nil { + return err + } + + if numTransact >= 0 { + b.AbortedTransactions = make([]*AbortedTransaction, numTransact) + } + + for i := 0; i < numTransact; i++ { + transact := new(AbortedTransaction) + if err = transact.decode(pd); err != nil { + return err + } + b.AbortedTransactions[i] = transact + } + } + + recordsSize, err := pd.getInt32() if err != nil { return err } - msgSetDecoder, err := pd.getSubset(int(msgSetSize)) + recordsDecoder, err := pd.getSubset(int(recordsSize)) if err != nil { return err } - err = (&b.MsgSet).decode(msgSetDecoder) + var records Records + if version >= 4 { + records = newDefaultRecords(nil) + } else { + records = newLegacyRecords(nil) + } + if recordsSize > 0 { + if err = records.decode(recordsDecoder); err != nil { + return err + } + } + b.Records = records - return err + return nil } -func (b *FetchResponseBlock) encode(pe packetEncoder) (err error) { +func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(int16(b.Err)) pe.putInt64(b.HighWaterMarkOffset) + if version >= 4 { + pe.putInt64(b.LastStableOffset) + + if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil { + return err + } + for _, transact := range b.AbortedTransactions { + if err = transact.encode(pe); err != nil { + return err + } + } + } + pe.push(&lengthField{}) - err = b.MsgSet.encode(pe) + err = b.Records.encode(pe) if err != nil { return err } @@ -90,7 +164,7 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) { } block := new(FetchResponseBlock) - err = block.decode(pd) + err = block.decode(pd, version) if err != nil { return err } @@ -124,7 +198,7 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) { for id, block := range partitions { pe.putInt32(id) - err = block.encode(pe) + err = block.encode(pe, r.Version) if err != nil { return err } @@ -148,6 +222,10 @@ func (r *FetchResponse) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_10_1_0 + case 4: + return V0_11_0_0 default: return minVersion } @@ -182,7 +260,7 @@ func (r *FetchResponse) AddError(topic string, partition int32, err KError) { frb.Err = err } -func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { +func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock { if r.Blocks == nil { r.Blocks = make(map[string]map[int32]*FetchResponseBlock) } @@ -196,6 +274,11 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc frb = new(FetchResponseBlock) partitions[partition] = frb } + + return frb +} + +func encodeKV(key, value Encoder) ([]byte, []byte) { var kb []byte var vb []byte if key != nil { @@ -204,7 +287,36 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc if value != nil { vb, _ = value.Encode() } + + return kb, vb +} + +func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { + frb := r.getOrCreateBlock(topic, partition) + kb, vb := encodeKV(key, value) msg := &Message{Key: kb, Value: vb} msgBlock := &MessageBlock{Msg: msg, Offset: offset} - frb.MsgSet.Messages = append(frb.MsgSet.Messages, msgBlock) + set := frb.Records.msgSet + if set == nil { + set = &MessageSet{} + frb.Records = newLegacyRecords(set) + } + set.Messages = append(set.Messages, msgBlock) +} + +func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) { + frb := r.getOrCreateBlock(topic, partition) + kb, vb := encodeKV(key, value) + rec := &Record{Key: kb, Value: vb, OffsetDelta: offset} + batch := frb.Records.recordBatch + if batch == nil { + batch = &RecordBatch{Version: 2} + frb.Records = newDefaultRecords(batch) + } + batch.addRecord(rec) +} + +func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) { + frb := r.getOrCreateBlock(topic, partition) + frb.LastStableOffset = offset } diff --git a/vendor/github.com/Shopify/sarama/fetch_response_test.go b/vendor/github.com/Shopify/sarama/fetch_response_test.go index 52fb5a74c..f0e5d87ff 100644 --- a/vendor/github.com/Shopify/sarama/fetch_response_test.go +++ b/vendor/github.com/Shopify/sarama/fetch_response_test.go @@ -26,6 +26,43 @@ var ( 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + + oneRecordFetchResponse = []byte{ + 0x00, 0x00, 0x00, 0x00, // ThrottleTime + 0x00, 0x00, 0x00, 0x01, // Number of Topics + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic + 0x00, 0x00, 0x00, 0x01, // Number of Partitions + 0x00, 0x00, 0x00, 0x05, // Partition + 0x00, 0x01, // Error + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // High Watermark Offset + 0x00, 0x00, 0x00, 0x00, 0x10, 0x10, 0x10, 0x10, // Last Stable Offset + 0x00, 0x00, 0x00, 0x00, // Number of Aborted Transactions + 0x00, 0x00, 0x00, 0x52, // Records length + // recordBatch + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x46, + 0x00, 0x00, 0x00, 0x00, + 0x02, + 0xDB, 0x47, 0x14, 0xC9, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // record + 0x28, + 0x00, + 0x0A, + 0x00, + 0x08, 0x01, 0x02, 0x03, 0x04, + 0x06, 0x05, 0x06, 0x07, + 0x02, + 0x06, 0x08, 0x09, 0x0A, + 0x04, 0x0B, 0x0C, + } ) func TestEmptyFetchResponse(t *testing.T) { @@ -60,14 +97,22 @@ func TestOneMessageFetchResponse(t *testing.T) { if block.HighWaterMarkOffset != 0x10101010 { t.Error("Decoding didn't produce correct high water mark offset.") } - if block.MsgSet.PartialTrailingMessage { + partial, err := block.Records.isPartial() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if partial { t.Error("Decoding detected a partial trailing message where there wasn't one.") } - if len(block.MsgSet.Messages) != 1 { + n, err := block.Records.numRecords() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 1 { t.Fatal("Decoding produced incorrect number of messages.") } - msgBlock := block.MsgSet.Messages[0] + msgBlock := block.Records.msgSet.Messages[0] if msgBlock.Offset != 0x550000 { t.Error("Decoding produced incorrect message offset.") } @@ -82,3 +127,49 @@ func TestOneMessageFetchResponse(t *testing.T) { t.Error("Decoding produced incorrect message value.") } } + +func TestOneRecordFetchResponse(t *testing.T) { + response := FetchResponse{} + testVersionDecodable(t, "one record", &response, oneRecordFetchResponse, 4) + + if len(response.Blocks) != 1 { + t.Fatal("Decoding produced incorrect number of topic blocks.") + } + + if len(response.Blocks["topic"]) != 1 { + t.Fatal("Decoding produced incorrect number of partition blocks for topic.") + } + + block := response.GetBlock("topic", 5) + if block == nil { + t.Fatal("GetBlock didn't return block.") + } + if block.Err != ErrOffsetOutOfRange { + t.Error("Decoding didn't produce correct error code.") + } + if block.HighWaterMarkOffset != 0x10101010 { + t.Error("Decoding didn't produce correct high water mark offset.") + } + partial, err := block.Records.isPartial() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if partial { + t.Error("Decoding detected a partial trailing record where there wasn't one.") + } + + n, err := block.Records.numRecords() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if n != 1 { + t.Fatal("Decoding produced incorrect number of records.") + } + rec := block.Records.recordBatch.Records[0] + if !bytes.Equal(rec.Key, []byte{0x01, 0x02, 0x03, 0x04}) { + t.Error("Decoding produced incorrect record key.") + } + if !bytes.Equal(rec.Value, []byte{0x05, 0x06, 0x07}) { + t.Error("Decoding produced incorrect record value.") + } +} diff --git a/vendor/github.com/Shopify/sarama/length_field.go b/vendor/github.com/Shopify/sarama/length_field.go index 70078be5d..576b1a6f6 100644 --- a/vendor/github.com/Shopify/sarama/length_field.go +++ b/vendor/github.com/Shopify/sarama/length_field.go @@ -27,3 +27,43 @@ func (l *lengthField) check(curOffset int, buf []byte) error { return nil } + +type varintLengthField struct { + startOffset int + length int64 +} + +func (l *varintLengthField) decode(pd packetDecoder) error { + var err error + l.length, err = pd.getVarint() + return err +} + +func (l *varintLengthField) saveOffset(in int) { + l.startOffset = in +} + +func (l *varintLengthField) adjustLength(currOffset int) int { + oldFieldSize := l.reserveLength() + l.length = int64(currOffset - l.startOffset - oldFieldSize) + + return l.reserveLength() - oldFieldSize +} + +func (l *varintLengthField) reserveLength() int { + var tmp [binary.MaxVarintLen64]byte + return binary.PutVarint(tmp[:], l.length) +} + +func (l *varintLengthField) run(curOffset int, buf []byte) error { + binary.PutVarint(buf[l.startOffset:], l.length) + return nil +} + +func (l *varintLengthField) check(curOffset int, buf []byte) error { + if int64(curOffset-l.startOffset-l.reserveLength()) != l.length { + return PacketDecodingError{"length field invalid"} + } + + return nil +} diff --git a/vendor/github.com/Shopify/sarama/message.go b/vendor/github.com/Shopify/sarama/message.go index 06f175f67..bd5650bbc 100644 --- a/vendor/github.com/Shopify/sarama/message.go +++ b/vendor/github.com/Shopify/sarama/message.go @@ -37,7 +37,7 @@ type Message struct { } func (m *Message) encode(pe packetEncoder) error { - pe.push(&crc32Field{}) + pe.push(newCRC32Field(crcIEEE)) pe.putInt8(m.Version) @@ -45,15 +45,9 @@ func (m *Message) encode(pe packetEncoder) error { pe.putInt8(attributes) if m.Version >= 1 { - timestamp := int64(-1) - - if !m.Timestamp.Before(time.Unix(0, 0)) { - timestamp = m.Timestamp.UnixNano() / int64(time.Millisecond) - } else if !m.Timestamp.IsZero() { - return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", m.Timestamp)} + if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil { + return err } - - pe.putInt64(timestamp) } err := pe.putBytes(m.Key) @@ -112,7 +106,7 @@ func (m *Message) encode(pe packetEncoder) error { } func (m *Message) decode(pd packetDecoder) (err error) { - err = pd.push(&crc32Field{}) + err = pd.push(newCRC32Field(crcIEEE)) if err != nil { return err } @@ -133,19 +127,9 @@ func (m *Message) decode(pd packetDecoder) (err error) { m.Codec = CompressionCodec(attribute & compressionCodecMask) if m.Version == 1 { - millis, err := pd.getInt64() - if err != nil { + if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil { return err } - - // negative timestamps are invalid, in these cases we should return - // a zero time - timestamp := time.Time{} - if millis >= 0 { - timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond)) - } - - m.Timestamp = timestamp } m.Key, err = pd.getBytes() diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go index d94bd24c6..9659757b7 100644 --- a/vendor/github.com/Shopify/sarama/mockresponses.go +++ b/vendor/github.com/Shopify/sarama/mockresponses.go @@ -122,6 +122,7 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder { type MockOffsetResponse struct { offsets map[string]map[int32]map[int64]int64 t TestReporter + version int16 } func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse { @@ -131,6 +132,11 @@ func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse { } } +func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse { + mor.version = version + return mor +} + func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse { partitions := mor.offsets[topic] if partitions == nil { @@ -148,7 +154,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder { offsetRequest := reqBody.(*OffsetRequest) - offsetResponse := &OffsetResponse{} + offsetResponse := &OffsetResponse{Version: mor.version} for topic, partitions := range offsetRequest.blocks { for partition, block := range partitions { offset := mor.getOffset(topic, partition, block.time) @@ -402,7 +408,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ProduceRequest) res := &ProduceResponse{} - for topic, partitions := range req.msgSets { + for topic, partitions := range req.records { for partition := range partitions { res.AddTopicPartition(topic, partition, mr.getError(topic, partition)) } diff --git a/vendor/github.com/Shopify/sarama/packet_decoder.go b/vendor/github.com/Shopify/sarama/packet_decoder.go index 28670c0e6..904e28074 100644 --- a/vendor/github.com/Shopify/sarama/packet_decoder.go +++ b/vendor/github.com/Shopify/sarama/packet_decoder.go @@ -9,11 +9,15 @@ type packetDecoder interface { getInt16() (int16, error) getInt32() (int32, error) getInt64() (int64, error) + getVarint() (int64, error) getArrayLength() (int, error) // Collections getBytes() ([]byte, error) + getVarintBytes() ([]byte, error) + getRawBytes(length int) ([]byte, error) getString() (string, error) + getNullableString() (*string, error) getInt32Array() ([]int32, error) getInt64Array() ([]int64, error) getStringArray() ([]string, error) @@ -43,3 +47,12 @@ type pushDecoder interface { // of data from the saved offset, and verify it based on the data between the saved offset and curOffset. check(curOffset int, buf []byte) error } + +// dynamicPushDecoder extends the interface of pushDecoder for uses cases where the length of the +// fields itself is unknown until its value was decoded (for instance varint encoded length +// fields). +// During push, dynamicPushDecoder.decode() method will be called instead of reserveLength() +type dynamicPushDecoder interface { + pushDecoder + decoder +} diff --git a/vendor/github.com/Shopify/sarama/packet_encoder.go b/vendor/github.com/Shopify/sarama/packet_encoder.go index 27a10f6d4..aecd2b80c 100644 --- a/vendor/github.com/Shopify/sarama/packet_encoder.go +++ b/vendor/github.com/Shopify/sarama/packet_encoder.go @@ -11,12 +11,15 @@ type packetEncoder interface { putInt16(in int16) putInt32(in int32) putInt64(in int64) + putVarint(in int64) putArrayLength(in int) error // Collections putBytes(in []byte) error + putVarintBytes(in []byte) error putRawBytes(in []byte) error putString(in string) error + putNullableString(in *string) error putStringArray(in []string) error putInt32Array(in []int32) error putInt64Array(in []int64) error @@ -48,3 +51,14 @@ type pushEncoder interface { // of data to the saved offset, based on the data between the saved offset and curOffset. run(curOffset int, buf []byte) error } + +// dynamicPushEncoder extends the interface of pushEncoder for uses cases where the length of the +// fields itself is unknown until its value was computed (for instance varint encoded length +// fields). +type dynamicPushEncoder interface { + pushEncoder + + // Called during pop() to adjust the length of the field. + // It should return the difference in bytes between the last computed length and current length. + adjustLength(currOffset int) int +} diff --git a/vendor/github.com/Shopify/sarama/prep_encoder.go b/vendor/github.com/Shopify/sarama/prep_encoder.go index fd5ea0f91..d99cd71ad 100644 --- a/vendor/github.com/Shopify/sarama/prep_encoder.go +++ b/vendor/github.com/Shopify/sarama/prep_encoder.go @@ -1,6 +1,7 @@ package sarama import ( + "encoding/binary" "fmt" "math" @@ -8,6 +9,7 @@ import ( ) type prepEncoder struct { + stack []pushEncoder length int } @@ -29,6 +31,11 @@ func (pe *prepEncoder) putInt64(in int64) { pe.length += 8 } +func (pe *prepEncoder) putVarint(in int64) { + var buf [binary.MaxVarintLen64]byte + pe.length += binary.PutVarint(buf[:], in) +} + func (pe *prepEncoder) putArrayLength(in int) error { if in > math.MaxInt32 { return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)} @@ -44,11 +51,16 @@ func (pe *prepEncoder) putBytes(in []byte) error { if in == nil { return nil } - if len(in) > math.MaxInt32 { - return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))} + return pe.putRawBytes(in) +} + +func (pe *prepEncoder) putVarintBytes(in []byte) error { + if in == nil { + pe.putVarint(-1) + return nil } - pe.length += len(in) - return nil + pe.putVarint(int64(len(in))) + return pe.putRawBytes(in) } func (pe *prepEncoder) putRawBytes(in []byte) error { @@ -59,6 +71,14 @@ func (pe *prepEncoder) putRawBytes(in []byte) error { return nil } +func (pe *prepEncoder) putNullableString(in *string) error { + if in == nil { + pe.length += 2 + return nil + } + return pe.putString(*in) +} + func (pe *prepEncoder) putString(in string) error { pe.length += 2 if len(in) > math.MaxInt16 { @@ -108,10 +128,18 @@ func (pe *prepEncoder) offset() int { // stackable func (pe *prepEncoder) push(in pushEncoder) { + in.saveOffset(pe.length) pe.length += in.reserveLength() + pe.stack = append(pe.stack, in) } func (pe *prepEncoder) pop() error { + in := pe.stack[len(pe.stack)-1] + pe.stack = pe.stack[:len(pe.stack)-1] + if dpe, ok := in.(dynamicPushEncoder); ok { + pe.length += dpe.adjustLength(pe.length) + } + return nil } diff --git a/vendor/github.com/Shopify/sarama/produce_request.go b/vendor/github.com/Shopify/sarama/produce_request.go index 40dc80151..300984cef 100644 --- a/vendor/github.com/Shopify/sarama/produce_request.go +++ b/vendor/github.com/Shopify/sarama/produce_request.go @@ -21,19 +21,56 @@ const ( ) type ProduceRequest struct { - RequiredAcks RequiredAcks - Timeout int32 - Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10 - msgSets map[string]map[int32]*MessageSet + TransactionalID *string + RequiredAcks RequiredAcks + Timeout int32 + Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11 + records map[string]map[int32]Records +} + +func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram, + topicCompressionRatioMetric metrics.Histogram) int64 { + var topicRecordCount int64 + for _, messageBlock := range msgSet.Messages { + // Is this a fake "message" wrapping real messages? + if messageBlock.Msg.Set != nil { + topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) + } else { + // A single uncompressed message + topicRecordCount++ + } + // Better be safe than sorry when computing the compression ratio + if messageBlock.Msg.compressedSize != 0 { + compressionRatio := float64(len(messageBlock.Msg.Value)) / + float64(messageBlock.Msg.compressedSize) + // Histogram do not support decimal values, let's multiple it by 100 for better precision + intCompressionRatio := int64(100 * compressionRatio) + compressionRatioMetric.Update(intCompressionRatio) + topicCompressionRatioMetric.Update(intCompressionRatio) + } + } + return topicRecordCount +} + +func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram, + topicCompressionRatioMetric metrics.Histogram) int64 { + if recordBatch.compressedRecords != nil { + compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100) + compressionRatioMetric.Update(compressionRatio) + topicCompressionRatioMetric.Update(compressionRatio) + } + + return int64(len(recordBatch.Records)) } func (r *ProduceRequest) encode(pe packetEncoder) error { + if r.Version >= 3 { + if err := pe.putNullableString(r.TransactionalID); err != nil { + return err + } + } pe.putInt16(int16(r.RequiredAcks)) pe.putInt32(r.Timeout) - err := pe.putArrayLength(len(r.msgSets)) - if err != nil { - return err - } metricRegistry := pe.metricRegistry() var batchSizeMetric metrics.Histogram var compressionRatioMetric metrics.Histogram @@ -41,9 +78,14 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry) compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry) } - totalRecordCount := int64(0) - for topic, partitions := range r.msgSets { + + err := pe.putArrayLength(len(r.records)) + if err != nil { + return err + } + + for topic, partitions := range r.records { err = pe.putString(topic) if err != nil { return err @@ -57,11 +99,11 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { if metricRegistry != nil { topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry) } - for id, msgSet := range partitions { + for id, records := range partitions { startOffset := pe.offset() pe.putInt32(id) pe.push(&lengthField{}) - err = msgSet.encode(pe) + err = records.encode(pe) if err != nil { return err } @@ -70,23 +112,10 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { return err } if metricRegistry != nil { - for _, messageBlock := range msgSet.Messages { - // Is this a fake "message" wrapping real messages? - if messageBlock.Msg.Set != nil { - topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) - } else { - // A single uncompressed message - topicRecordCount++ - } - // Better be safe than sorry when computing the compression ratio - if messageBlock.Msg.compressedSize != 0 { - compressionRatio := float64(len(messageBlock.Msg.Value)) / - float64(messageBlock.Msg.compressedSize) - // Histogram do not support decimal values, let's multiple it by 100 for better precision - intCompressionRatio := int64(100 * compressionRatio) - compressionRatioMetric.Update(intCompressionRatio) - topicCompressionRatioMetric.Update(intCompressionRatio) - } + if r.Version >= 3 { + topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric) + } else { + topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric) } batchSize := int64(pe.offset() - startOffset) batchSizeMetric.Update(batchSize) @@ -108,6 +137,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { } func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { + r.Version = version + + if version >= 3 { + id, err := pd.getNullableString() + if err != nil { + return err + } + r.TransactionalID = id + } requiredAcks, err := pd.getInt16() if err != nil { return err @@ -123,7 +161,8 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { if topicCount == 0 { return nil } - r.msgSets = make(map[string]map[int32]*MessageSet) + + r.records = make(map[string]map[int32]Records) for i := 0; i < topicCount; i++ { topic, err := pd.getString() if err != nil { @@ -133,28 +172,34 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { if err != nil { return err } - r.msgSets[topic] = make(map[int32]*MessageSet) + r.records[topic] = make(map[int32]Records) + for j := 0; j < partitionCount; j++ { partition, err := pd.getInt32() if err != nil { return err } - messageSetSize, err := pd.getInt32() + size, err := pd.getInt32() if err != nil { return err } - msgSetDecoder, err := pd.getSubset(int(messageSetSize)) + recordsDecoder, err := pd.getSubset(int(size)) if err != nil { return err } - msgSet := &MessageSet{} - err = msgSet.decode(msgSetDecoder) - if err != nil { + var records Records + if version >= 3 { + records = newDefaultRecords(nil) + } else { + records = newLegacyRecords(nil) + } + if err := records.decode(recordsDecoder); err != nil { return err } - r.msgSets[topic][partition] = msgSet + r.records[topic][partition] = records } } + return nil } @@ -172,38 +217,41 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_11_0_0 default: return minVersion } } -func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { - if r.msgSets == nil { - r.msgSets = make(map[string]map[int32]*MessageSet) +func (r *ProduceRequest) ensureRecords(topic string, partition int32) { + if r.records == nil { + r.records = make(map[string]map[int32]Records) } - if r.msgSets[topic] == nil { - r.msgSets[topic] = make(map[int32]*MessageSet) + if r.records[topic] == nil { + r.records[topic] = make(map[int32]Records) } +} - set := r.msgSets[topic][partition] +func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { + r.ensureRecords(topic, partition) + set := r.records[topic][partition].msgSet if set == nil { set = new(MessageSet) - r.msgSets[topic][partition] = set + r.records[topic][partition] = newLegacyRecords(set) } set.addMessage(msg) } func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) { - if r.msgSets == nil { - r.msgSets = make(map[string]map[int32]*MessageSet) - } - - if r.msgSets[topic] == nil { - r.msgSets[topic] = make(map[int32]*MessageSet) - } + r.ensureRecords(topic, partition) + r.records[topic][partition] = newLegacyRecords(set) +} - r.msgSets[topic][partition] = set +func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) { + r.ensureRecords(topic, partition) + r.records[topic][partition] = newDefaultRecords(batch) } diff --git a/vendor/github.com/Shopify/sarama/produce_request_test.go b/vendor/github.com/Shopify/sarama/produce_request_test.go index 21f4ba5b1..be6459596 100644 --- a/vendor/github.com/Shopify/sarama/produce_request_test.go +++ b/vendor/github.com/Shopify/sarama/produce_request_test.go @@ -2,6 +2,7 @@ package sarama import ( "testing" + "time" ) var ( @@ -32,6 +33,41 @@ var ( 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} + + produceRequestOneRecord = []byte{ + 0xFF, 0xFF, // Transaction ID + 0x01, 0x23, // Required Acks + 0x00, 0x00, 0x04, 0x44, // Timeout + 0x00, 0x00, 0x00, 0x01, // Number of Topics + 0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic + 0x00, 0x00, 0x00, 0x01, // Number of Partitions + 0x00, 0x00, 0x00, 0xAD, // Partition + 0x00, 0x00, 0x00, 0x52, // Records length + // recordBatch + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x46, + 0x00, 0x00, 0x00, 0x00, + 0x02, + 0x54, 0x79, 0x61, 0xFD, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + // record + 0x28, + 0x00, + 0x0A, + 0x00, + 0x08, 0x01, 0x02, 0x03, 0x04, + 0x06, 0x05, 0x06, 0x07, + 0x02, + 0x06, 0x08, 0x09, 0x0A, + 0x04, 0x0B, 0x0C, + } ) func TestProduceRequest(t *testing.T) { @@ -44,4 +80,24 @@ func TestProduceRequest(t *testing.T) { request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}}) testRequest(t, "one message", request, produceRequestOneMessage) + + request.Version = 3 + batch := &RecordBatch{ + Version: 2, + FirstTimestamp: time.Unix(1479847795, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{{ + TimestampDelta: 5 * time.Millisecond, + Key: []byte{0x01, 0x02, 0x03, 0x04}, + Value: []byte{0x05, 0x06, 0x07}, + Headers: []*RecordHeader{{ + Key: []byte{0x08, 0x09, 0x0A}, + Value: []byte{0x0B, 0x0C}, + }}, + }}, + } + request.AddBatch("topic", 0xAD, batch) + packet := testRequestEncode(t, "one record", request, produceRequestOneRecord) + batch.Records[0].length.startOffset = 0 + testRequestDecode(t, "one record", request, packet) } diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go index 3f05dd9fb..043c40f87 100644 --- a/vendor/github.com/Shopify/sarama/produce_response.go +++ b/vendor/github.com/Shopify/sarama/produce_response.go @@ -1,6 +1,9 @@ package sarama -import "time" +import ( + "fmt" + "time" +) type ProduceResponseBlock struct { Err KError @@ -32,6 +35,23 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro return nil } +func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err error) { + pe.putInt16(int16(b.Err)) + pe.putInt64(b.Offset) + + if version >= 2 { + timestamp := int64(-1) + if !b.Timestamp.Before(time.Unix(0, 0)) { + timestamp = b.Timestamp.UnixNano() / int64(time.Millisecond) + } else if !b.Timestamp.IsZero() { + return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", b.Timestamp)} + } + pe.putInt64(timestamp) + } + + return nil +} + type ProduceResponse struct { Blocks map[string]map[int32]*ProduceResponseBlock Version int16 @@ -103,8 +123,10 @@ func (r *ProduceResponse) encode(pe packetEncoder) error { } for id, prb := range partitions { pe.putInt32(id) - pe.putInt16(int16(prb.Err)) - pe.putInt64(prb.Offset) + err = prb.encode(pe, r.Version) + if err != nil { + return err + } } } if r.Version >= 1 { @@ -127,6 +149,8 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion { return V0_9_0_0 case 2: return V0_10_0_0 + case 3: + return V0_11_0_0 default: return minVersion } diff --git a/vendor/github.com/Shopify/sarama/produce_response_test.go b/vendor/github.com/Shopify/sarama/produce_response_test.go index f71709fe8..197c7fb50 100644 --- a/vendor/github.com/Shopify/sarama/produce_response_test.go +++ b/vendor/github.com/Shopify/sarama/produce_response_test.go @@ -1,67 +1,128 @@ package sarama -import "testing" +import ( + "fmt" + "testing" + "time" +) var ( - produceResponseNoBlocks = []byte{ + produceResponseNoBlocksV0 = []byte{ 0x00, 0x00, 0x00, 0x00} - produceResponseManyBlocks = []byte{ - 0x00, 0x00, 0x00, 0x02, + produceResponseManyBlocksVersions = [][]byte{ + { + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x03, 'f', 'o', 'o', + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x00, 0x00, 0x01, // Partition 1 + 0x00, 0x02, // ErrInvalidMessage + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 + }, { + 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, 'f', 'o', 'o', - 0x00, 0x00, 0x00, 0x00, + 0x00, 0x03, 'f', 'o', 'o', + 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, 'b', 'a', 'r', - 0x00, 0x00, 0x00, 0x02, + 0x00, 0x00, 0x00, 0x01, // Partition 1 + 0x00, 0x02, // ErrInvalidMessage + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, + 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time + }, { + 0x00, 0x00, 0x00, 0x01, - 0x00, 0x00, 0x00, 0x02, - 0x00, 0x02, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + 0x00, 0x03, 'f', 'o', 'o', + 0x00, 0x00, 0x00, 0x01, + + 0x00, 0x00, 0x00, 0x01, // Partition 1 + 0x00, 0x02, // ErrInvalidMessage + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255 + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used) + + 0x00, 0x00, 0x00, 0x64, // 100 ms throttle time + }, + } ) -func TestProduceResponse(t *testing.T) { +func TestProduceResponseDecode(t *testing.T) { response := ProduceResponse{} - testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocks, 0) + testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0) if len(response.Blocks) != 0 { t.Error("Decoding produced", len(response.Blocks), "topics where there were none") } - testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, 0) - if len(response.Blocks) != 2 { - t.Error("Decoding produced", len(response.Blocks), "topics where there were 2") - } - if len(response.Blocks["foo"]) != 0 { - t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there were none") - } - if len(response.Blocks["bar"]) != 2 { - t.Error("Decoding produced", len(response.Blocks["bar"]), "partitions for 'bar' where there were two") - } - block := response.GetBlock("bar", 1) - if block == nil { - t.Error("Decoding did not produce a block for bar/1") - } else { - if block.Err != ErrNoError { - t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err)) + for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions { + t.Logf("Decoding produceResponseManyBlocks version %d", v) + testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v)) + if len(response.Blocks) != 1 { + t.Error("Decoding produced", len(response.Blocks), "topics where there was 1") } - if block.Offset != 0xFF { - t.Error("Decoding failed for bar/1/Offset, got:", block.Offset) + if len(response.Blocks["foo"]) != 1 { + t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one") } - } - block = response.GetBlock("bar", 2) - if block == nil { - t.Error("Decoding did not produce a block for bar/2") - } else { - if block.Err != ErrInvalidMessage { - t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err)) + block := response.GetBlock("foo", 1) + if block == nil { + t.Error("Decoding did not produce a block for foo/1") + } else { + if block.Err != ErrInvalidMessage { + t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err)) + } + if block.Offset != 255 { + t.Error("Decoding failed for foo/1/Offset, got:", block.Offset) + } + if v >= 2 { + if block.Timestamp != time.Unix(1, 0) { + t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp) + } + } } - if block.Offset != 0 { - t.Error("Decoding failed for bar/2/Offset, got:", block.Offset) + if v >= 1 { + if expected := 100 * time.Millisecond; response.ThrottleTime != expected { + t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime) + } } } } + +func TestProduceResponseEncode(t *testing.T) { + response := ProduceResponse{} + response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) + testEncodable(t, "empty", &response, produceResponseNoBlocksV0) + + response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock) + response.Blocks["foo"][1] = &ProduceResponseBlock{ + Err: ErrInvalidMessage, + Offset: 255, + Timestamp: time.Unix(1, 0), + } + response.ThrottleTime = 100 * time.Millisecond + for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions { + response.Version = int16(v) + testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks) + } +} + +func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) { + response := ProduceResponse{} + response.Version = 2 + response.Blocks = make(map[string]map[int32]*ProduceResponseBlock) + response.Blocks["t"] = make(map[int32]*ProduceResponseBlock) + response.Blocks["t"][0] = &ProduceResponseBlock{ + Err: ErrNoError, + Offset: 0, + // Use a timestamp before Unix time + Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond), + } + response.ThrottleTime = 100 * time.Millisecond + _, err := encode(&response, nil) + if err == nil { + t.Error("Expecting error, got nil") + } + if _, ok := err.(PacketEncodingError); !ok { + t.Error("Expecting PacketEncodingError, got:", err) + } +} diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go index 158d9c475..9e1d50d3a 100644 --- a/vendor/github.com/Shopify/sarama/produce_set.go +++ b/vendor/github.com/Shopify/sarama/produce_set.go @@ -1,11 +1,14 @@ package sarama -import "time" +import ( + "encoding/binary" + "time" +) type partitionSet struct { - msgs []*ProducerMessage - setToSend *MessageSet - bufferBytes int + msgs []*ProducerMessage + recordsToSend Records + bufferBytes int } type produceSet struct { @@ -39,31 +42,64 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } } + timestamp := msg.Timestamp + if msg.Timestamp.IsZero() { + timestamp = time.Now() + } + partitions := ps.msgs[msg.Topic] if partitions == nil { partitions = make(map[int32]*partitionSet) ps.msgs[msg.Topic] = partitions } + var size int + set := partitions[msg.Partition] if set == nil { - set = &partitionSet{setToSend: new(MessageSet)} + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + batch := &RecordBatch{ + FirstTimestamp: timestamp, + Version: 2, + ProducerID: -1, /* No producer id */ + Codec: ps.parent.conf.Producer.Compression, + } + set = &partitionSet{recordsToSend: newDefaultRecords(batch)} + size = recordBatchOverhead + } else { + set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))} + } partitions[msg.Partition] = set } set.msgs = append(set.msgs, msg) - msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} - if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { - if msg.Timestamp.IsZero() { - msgToSend.Timestamp = time.Now() - } else { - msgToSend.Timestamp = msg.Timestamp + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + // We are being conservative here to avoid having to prep encode the record + size += maximumRecordOverhead + rec := &Record{ + Key: key, + Value: val, + TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp), + } + size += len(key) + len(val) + if len(msg.Headers) > 0 { + rec.Headers = make([]*RecordHeader, len(msg.Headers)) + for i, h := range msg.Headers { + rec.Headers[i] = &h + size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32 + } + } + set.recordsToSend.recordBatch.addRecord(rec) + } else { + msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + msgToSend.Timestamp = timestamp + msgToSend.Version = 1 } - msgToSend.Version = 1 + set.recordsToSend.msgSet.addMessage(msgToSend) + size = producerMessageOverhead + len(key) + len(val) } - set.setToSend.addMessage(msgToSend) - size := producerMessageOverhead + len(key) + len(val) set.bufferBytes += size ps.bufferBytes += size ps.bufferCount++ @@ -79,17 +115,24 @@ func (ps *produceSet) buildRequest() *ProduceRequest { if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { req.Version = 2 } + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + req.Version = 3 + } for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { + if req.Version >= 3 { + req.AddBatch(topic, partition, set.recordsToSend.recordBatch) + continue + } if ps.parent.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, set.setToSend) + req.AddSet(topic, partition, set.recordsToSend.msgSet) } else { // When compression is enabled, the entire set for each partition is compressed // and sent as the payload of a single fake "message" with the appropriate codec // set and no key. When the server sees a message with a compression codec, it // decompresses the payload and treats the result as its message set. - payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry) + payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) @@ -98,11 +141,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: payload, - Set: set.setToSend, // Provide the underlying message set for accurate metrics + Set: set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics } if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { compMsg.Version = 1 - compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp + compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp } req.AddMessage(topic, partition, compMsg) } @@ -135,14 +178,19 @@ func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMe } func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { + version := 1 + if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { + version = 2 + } + switch { // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)): return true // Would we overflow the size-limit of a compressed message-batch for this partition? case ps.parent.conf.Producer.Compression != CompressionNone && ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && - ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: diff --git a/vendor/github.com/Shopify/sarama/produce_set_test.go b/vendor/github.com/Shopify/sarama/produce_set_test.go index d016a10b7..0f96e8818 100644 --- a/vendor/github.com/Shopify/sarama/produce_set_test.go +++ b/vendor/github.com/Shopify/sarama/produce_set_test.go @@ -137,7 +137,7 @@ func TestProduceSetRequestBuilding(t *testing.T) { t.Error("Timeout not set properly") } - if len(req.msgSets) != 2 { + if len(req.records) != 2 { t.Error("Wrong number of topics in request") } } @@ -166,7 +166,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { t.Error("Wrong request version") } - for _, msgBlock := range req.msgSets["t1"][0].Messages { + for _, msgBlock := range req.records["t1"][0].msgSet.Messages { msg := msgBlock.Msg err := msg.decodeSet() if err != nil { @@ -183,3 +183,40 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) { } } } + +func TestProduceSetV3RequestBuilding(t *testing.T) { + parent, ps := makeProduceSet() + parent.conf.Producer.RequiredAcks = WaitForAll + parent.conf.Producer.Timeout = 10 * time.Second + parent.conf.Version = V0_11_0_0 + + now := time.Now() + msg := &ProducerMessage{ + Topic: "t1", + Partition: 0, + Key: StringEncoder(TestMessage), + Value: StringEncoder(TestMessage), + Timestamp: now, + } + for i := 0; i < 10; i++ { + safeAddMessage(t, ps, msg) + msg.Timestamp = msg.Timestamp.Add(time.Second) + } + + req := ps.buildRequest() + + if req.Version != 3 { + t.Error("Wrong request version") + } + + batch := req.records["t1"][0].recordBatch + if batch.FirstTimestamp != now { + t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp) + } + for i := 0; i < 10; i++ { + rec := batch.Records[i] + if rec.TimestampDelta != time.Duration(i)*time.Second { + t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta) + } + } +} diff --git a/vendor/github.com/Shopify/sarama/real_decoder.go b/vendor/github.com/Shopify/sarama/real_decoder.go index 3cf93533a..3ff8212ad 100644 --- a/vendor/github.com/Shopify/sarama/real_decoder.go +++ b/vendor/github.com/Shopify/sarama/real_decoder.go @@ -7,8 +7,10 @@ import ( var errInvalidArrayLength = PacketDecodingError{"invalid array length"} var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} +var errInvalidByteSliceLengthType = PacketDecodingError{"invalid byteslice length type"} var errInvalidStringLength = PacketDecodingError{"invalid string length"} var errInvalidSubsetSize = PacketDecodingError{"invalid subset size"} +var errVarintOverflow = PacketDecodingError{"varint overflow"} type realDecoder struct { raw []byte @@ -58,12 +60,26 @@ func (rd *realDecoder) getInt64() (int64, error) { return tmp, nil } +func (rd *realDecoder) getVarint() (int64, error) { + tmp, n := binary.Varint(rd.raw[rd.off:]) + if n == 0 { + rd.off = len(rd.raw) + return -1, ErrInsufficientData + } + if n < 0 { + rd.off -= n + return -1, errVarintOverflow + } + rd.off += n + return tmp, nil +} + func (rd *realDecoder) getArrayLength() (int, error) { if rd.remaining() < 4 { rd.off = len(rd.raw) return -1, ErrInsufficientData } - tmp := int(binary.BigEndian.Uint32(rd.raw[rd.off:])) + tmp := int(int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))) rd.off += 4 if tmp > rd.remaining() { rd.off = len(rd.raw) @@ -78,28 +94,26 @@ func (rd *realDecoder) getArrayLength() (int, error) { func (rd *realDecoder) getBytes() ([]byte, error) { tmp, err := rd.getInt32() - if err != nil { return nil, err } + if tmp == -1 { + return nil, nil + } - n := int(tmp) + return rd.getRawBytes(int(tmp)) +} - switch { - case n < -1: - return nil, errInvalidByteSliceLength - case n == -1: +func (rd *realDecoder) getVarintBytes() ([]byte, error) { + tmp, err := rd.getVarint() + if err != nil { + return nil, err + } + if tmp == -1 { return nil, nil - case n == 0: - return make([]byte, 0), nil - case n > rd.remaining(): - rd.off = len(rd.raw) - return nil, ErrInsufficientData } - tmpStr := rd.raw[rd.off : rd.off+n] - rd.off += n - return tmpStr, nil + return rd.getRawBytes(int(tmp)) } func (rd *realDecoder) getString() (string, error) { @@ -128,6 +142,15 @@ func (rd *realDecoder) getString() (string, error) { return tmpStr, nil } +func (rd *realDecoder) getNullableString() (*string, error) { + tmp, err := rd.getInt16() + if err != nil || tmp == -1 { + return nil, err + } + str, err := rd.getString() + return &str, err +} + func (rd *realDecoder) getInt32Array() ([]int32, error) { if rd.remaining() < 4 { rd.off = len(rd.raw) @@ -221,8 +244,16 @@ func (rd *realDecoder) remaining() int { } func (rd *realDecoder) getSubset(length int) (packetDecoder, error) { + buf, err := rd.getRawBytes(length) + if err != nil { + return nil, err + } + return &realDecoder{raw: buf}, nil +} + +func (rd *realDecoder) getRawBytes(length int) ([]byte, error) { if length < 0 { - return nil, errInvalidSubsetSize + return nil, errInvalidByteSliceLength } else if length > rd.remaining() { rd.off = len(rd.raw) return nil, ErrInsufficientData @@ -230,7 +261,7 @@ func (rd *realDecoder) getSubset(length int) (packetDecoder, error) { start := rd.off rd.off += length - return &realDecoder{raw: rd.raw[start:rd.off]}, nil + return rd.raw[start:rd.off], nil } // stacks @@ -238,10 +269,17 @@ func (rd *realDecoder) getSubset(length int) (packetDecoder, error) { func (rd *realDecoder) push(in pushDecoder) error { in.saveOffset(rd.off) - reserve := in.reserveLength() - if rd.remaining() < reserve { - rd.off = len(rd.raw) - return ErrInsufficientData + var reserve int + if dpd, ok := in.(dynamicPushDecoder); ok { + if err := dpd.decode(rd); err != nil { + return err + } + } else { + reserve = in.reserveLength() + if rd.remaining() < reserve { + rd.off = len(rd.raw) + return ErrInsufficientData + } } rd.stack = append(rd.stack, in) diff --git a/vendor/github.com/Shopify/sarama/real_encoder.go b/vendor/github.com/Shopify/sarama/real_encoder.go index ced4267c3..51112e70c 100644 --- a/vendor/github.com/Shopify/sarama/real_encoder.go +++ b/vendor/github.com/Shopify/sarama/real_encoder.go @@ -35,6 +35,10 @@ func (re *realEncoder) putInt64(in int64) { re.off += 8 } +func (re *realEncoder) putVarint(in int64) { + re.off += binary.PutVarint(re.raw[re.off:], in) +} + func (re *realEncoder) putArrayLength(in int) error { re.putInt32(int32(in)) return nil @@ -54,9 +58,16 @@ func (re *realEncoder) putBytes(in []byte) error { return nil } re.putInt32(int32(len(in))) - copy(re.raw[re.off:], in) - re.off += len(in) - return nil + return re.putRawBytes(in) +} + +func (re *realEncoder) putVarintBytes(in []byte) error { + if in == nil { + re.putVarint(-1) + return nil + } + re.putVarint(int64(len(in))) + return re.putRawBytes(in) } func (re *realEncoder) putString(in string) error { @@ -66,6 +77,14 @@ func (re *realEncoder) putString(in string) error { return nil } +func (re *realEncoder) putNullableString(in *string) error { + if in == nil { + re.putInt16(-1) + return nil + } + return re.putString(*in) +} + func (re *realEncoder) putStringArray(in []string) error { err := re.putArrayLength(len(in)) if err != nil { diff --git a/vendor/github.com/Shopify/sarama/record.go b/vendor/github.com/Shopify/sarama/record.go new file mode 100644 index 000000000..cded308cf --- /dev/null +++ b/vendor/github.com/Shopify/sarama/record.go @@ -0,0 +1,113 @@ +package sarama + +import ( + "encoding/binary" + "time" +) + +const ( + controlMask = 0x20 + maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 +) + +type RecordHeader struct { + Key []byte + Value []byte +} + +func (h *RecordHeader) encode(pe packetEncoder) error { + if err := pe.putVarintBytes(h.Key); err != nil { + return err + } + return pe.putVarintBytes(h.Value) +} + +func (h *RecordHeader) decode(pd packetDecoder) (err error) { + if h.Key, err = pd.getVarintBytes(); err != nil { + return err + } + + if h.Value, err = pd.getVarintBytes(); err != nil { + return err + } + return nil +} + +type Record struct { + Attributes int8 + TimestampDelta time.Duration + OffsetDelta int64 + Key []byte + Value []byte + Headers []*RecordHeader + + length varintLengthField +} + +func (r *Record) encode(pe packetEncoder) error { + pe.push(&r.length) + pe.putInt8(r.Attributes) + pe.putVarint(int64(r.TimestampDelta / time.Millisecond)) + pe.putVarint(r.OffsetDelta) + if err := pe.putVarintBytes(r.Key); err != nil { + return err + } + if err := pe.putVarintBytes(r.Value); err != nil { + return err + } + pe.putVarint(int64(len(r.Headers))) + + for _, h := range r.Headers { + if err := h.encode(pe); err != nil { + return err + } + } + + return pe.pop() +} + +func (r *Record) decode(pd packetDecoder) (err error) { + if err = pd.push(&r.length); err != nil { + return err + } + + if r.Attributes, err = pd.getInt8(); err != nil { + return err + } + + timestamp, err := pd.getVarint() + if err != nil { + return err + } + r.TimestampDelta = time.Duration(timestamp) * time.Millisecond + + if r.OffsetDelta, err = pd.getVarint(); err != nil { + return err + } + + if r.Key, err = pd.getVarintBytes(); err != nil { + return err + } + + if r.Value, err = pd.getVarintBytes(); err != nil { + return err + } + + numHeaders, err := pd.getVarint() + if err != nil { + return err + } + + if numHeaders >= 0 { + r.Headers = make([]*RecordHeader, numHeaders) + } + for i := int64(0); i < numHeaders; i++ { + hdr := new(RecordHeader) + if err := hdr.decode(pd); err != nil { + return err + } + r.Headers[i] = hdr + } + + return pd.pop() +} diff --git a/vendor/github.com/Shopify/sarama/record_batch.go b/vendor/github.com/Shopify/sarama/record_batch.go new file mode 100644 index 000000000..3c148be58 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/record_batch.go @@ -0,0 +1,260 @@ +package sarama + +import ( + "bytes" + "compress/gzip" + "fmt" + "io/ioutil" + "time" + + "github.com/eapache/go-xerial-snappy" + "github.com/pierrec/lz4" +) + +const recordBatchOverhead = 49 + +type recordsArray []*Record + +func (e recordsArray) encode(pe packetEncoder) error { + for _, r := range e { + if err := r.encode(pe); err != nil { + return err + } + } + return nil +} + +func (e recordsArray) decode(pd packetDecoder) error { + for i := range e { + rec := &Record{} + if err := rec.decode(pd); err != nil { + return err + } + e[i] = rec + } + return nil +} + +type RecordBatch struct { + FirstOffset int64 + PartitionLeaderEpoch int32 + Version int8 + Codec CompressionCodec + Control bool + LastOffsetDelta int32 + FirstTimestamp time.Time + MaxTimestamp time.Time + ProducerID int64 + ProducerEpoch int16 + FirstSequence int32 + Records []*Record + PartialTrailingRecord bool + + compressedRecords []byte + recordsLen int // uncompressed records size +} + +func (b *RecordBatch) encode(pe packetEncoder) error { + if b.Version != 2 { + return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)} + } + pe.putInt64(b.FirstOffset) + pe.push(&lengthField{}) + pe.putInt32(b.PartitionLeaderEpoch) + pe.putInt8(b.Version) + pe.push(newCRC32Field(crcCastagnoli)) + pe.putInt16(b.computeAttributes()) + pe.putInt32(b.LastOffsetDelta) + + if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil { + return err + } + + if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil { + return err + } + + pe.putInt64(b.ProducerID) + pe.putInt16(b.ProducerEpoch) + pe.putInt32(b.FirstSequence) + + if err := pe.putArrayLength(len(b.Records)); err != nil { + return err + } + + if b.compressedRecords == nil { + if err := b.encodeRecords(pe); err != nil { + return err + } + } + if err := pe.putRawBytes(b.compressedRecords); err != nil { + return err + } + + if err := pe.pop(); err != nil { + return err + } + return pe.pop() +} + +func (b *RecordBatch) decode(pd packetDecoder) (err error) { + if b.FirstOffset, err = pd.getInt64(); err != nil { + return err + } + + batchLen, err := pd.getInt32() + if err != nil { + return err + } + + if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil { + return err + } + + if b.Version, err = pd.getInt8(); err != nil { + return err + } + + if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil { + return err + } + + attributes, err := pd.getInt16() + if err != nil { + return err + } + b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask) + b.Control = attributes&controlMask == controlMask + + if b.LastOffsetDelta, err = pd.getInt32(); err != nil { + return err + } + + if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil { + return err + } + + if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil { + return err + } + + if b.ProducerID, err = pd.getInt64(); err != nil { + return err + } + + if b.ProducerEpoch, err = pd.getInt16(); err != nil { + return err + } + + if b.FirstSequence, err = pd.getInt32(); err != nil { + return err + } + + numRecs, err := pd.getArrayLength() + if err != nil { + return err + } + if numRecs >= 0 { + b.Records = make([]*Record, numRecs) + } + + bufSize := int(batchLen) - recordBatchOverhead + recBuffer, err := pd.getRawBytes(bufSize) + if err != nil { + return err + } + + if err = pd.pop(); err != nil { + return err + } + + switch b.Codec { + case CompressionNone: + case CompressionGZIP: + reader, err := gzip.NewReader(bytes.NewReader(recBuffer)) + if err != nil { + return err + } + if recBuffer, err = ioutil.ReadAll(reader); err != nil { + return err + } + case CompressionSnappy: + if recBuffer, err = snappy.Decode(recBuffer); err != nil { + return err + } + case CompressionLZ4: + reader := lz4.NewReader(bytes.NewReader(recBuffer)) + if recBuffer, err = ioutil.ReadAll(reader); err != nil { + return err + } + default: + return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)} + } + + b.recordsLen = len(recBuffer) + err = decode(recBuffer, recordsArray(b.Records)) + if err == ErrInsufficientData { + b.PartialTrailingRecord = true + b.Records = nil + return nil + } + return err +} + +func (b *RecordBatch) encodeRecords(pe packetEncoder) error { + var raw []byte + if b.Codec != CompressionNone { + var err error + if raw, err = encode(recordsArray(b.Records), nil); err != nil { + return err + } + b.recordsLen = len(raw) + } + + switch b.Codec { + case CompressionNone: + offset := pe.offset() + if err := recordsArray(b.Records).encode(pe); err != nil { + return err + } + b.recordsLen = pe.offset() - offset + case CompressionGZIP: + var buf bytes.Buffer + writer := gzip.NewWriter(&buf) + if _, err := writer.Write(raw); err != nil { + return err + } + if err := writer.Close(); err != nil { + return err + } + b.compressedRecords = buf.Bytes() + case CompressionSnappy: + b.compressedRecords = snappy.Encode(raw) + case CompressionLZ4: + var buf bytes.Buffer + writer := lz4.NewWriter(&buf) + if _, err := writer.Write(raw); err != nil { + return err + } + if err := writer.Close(); err != nil { + return err + } + b.compressedRecords = buf.Bytes() + default: + return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)} + } + + return nil +} + +func (b *RecordBatch) computeAttributes() int16 { + attr := int16(b.Codec) & int16(compressionCodecMask) + if b.Control { + attr |= controlMask + } + return attr +} + +func (b *RecordBatch) addRecord(r *Record) { + b.Records = append(b.Records, r) +} diff --git a/vendor/github.com/Shopify/sarama/record_test.go b/vendor/github.com/Shopify/sarama/record_test.go new file mode 100644 index 000000000..68824edee --- /dev/null +++ b/vendor/github.com/Shopify/sarama/record_test.go @@ -0,0 +1,284 @@ +package sarama + +import ( + "reflect" + "runtime" + "strconv" + "strings" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" +) + +var recordBatchTestCases = []struct { + name string + batch RecordBatch + encoded []byte + oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8 +}{ + { + name: "empty record", + batch: RecordBatch{ + Version: 2, + FirstTimestamp: time.Unix(0, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{}, + }, + encoded: []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, // First Offset + 0, 0, 0, 49, // Length + 0, 0, 0, 0, // Partition Leader Epoch + 2, // Version + 89, 95, 183, 221, // CRC + 0, 0, // Attributes + 0, 0, 0, 0, // Last Offset Delta + 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID + 0, 0, // Producer Epoch + 0, 0, 0, 0, // First Sequence + 0, 0, 0, 0, // Number of Records + }, + }, + { + name: "control batch", + batch: RecordBatch{ + Version: 2, + Control: true, + FirstTimestamp: time.Unix(0, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{}, + }, + encoded: []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, // First Offset + 0, 0, 0, 49, // Length + 0, 0, 0, 0, // Partition Leader Epoch + 2, // Version + 81, 46, 67, 217, // CRC + 0, 32, // Attributes + 0, 0, 0, 0, // Last Offset Delta + 0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID + 0, 0, // Producer Epoch + 0, 0, 0, 0, // First Sequence + 0, 0, 0, 0, // Number of Records + }, + }, + { + name: "uncompressed record", + batch: RecordBatch{ + Version: 2, + FirstTimestamp: time.Unix(1479847795, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{{ + TimestampDelta: 5 * time.Millisecond, + Key: []byte{1, 2, 3, 4}, + Value: []byte{5, 6, 7}, + Headers: []*RecordHeader{{ + Key: []byte{8, 9, 10}, + Value: []byte{11, 12}, + }}, + }}, + recordsLen: 21, + }, + encoded: []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, // First Offset + 0, 0, 0, 70, // Length + 0, 0, 0, 0, // Partition Leader Epoch + 2, // Version + 84, 121, 97, 253, // CRC + 0, 0, // Attributes + 0, 0, 0, 0, // Last Offset Delta + 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID + 0, 0, // Producer Epoch + 0, 0, 0, 0, // First Sequence + 0, 0, 0, 1, // Number of Records + 40, // Record Length + 0, // Attributes + 10, // Timestamp Delta + 0, // Offset Delta + 8, // Key Length + 1, 2, 3, 4, + 6, // Value Length + 5, 6, 7, + 2, // Number of Headers + 6, // Header Key Length + 8, 9, 10, // Header Key + 4, // Header Value Length + 11, 12, // Header Value + }, + }, + { + name: "gzipped record", + batch: RecordBatch{ + Version: 2, + Codec: CompressionGZIP, + FirstTimestamp: time.Unix(1479847795, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{{ + TimestampDelta: 5 * time.Millisecond, + Key: []byte{1, 2, 3, 4}, + Value: []byte{5, 6, 7}, + Headers: []*RecordHeader{{ + Key: []byte{8, 9, 10}, + Value: []byte{11, 12}, + }}, + }}, + recordsLen: 21, + }, + encoded: []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, // First Offset + 0, 0, 0, 94, // Length + 0, 0, 0, 0, // Partition Leader Epoch + 2, // Version + 159, 236, 182, 189, // CRC + 0, 1, // Attributes + 0, 0, 0, 0, // Last Offset Delta + 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID + 0, 0, // Producer Epoch + 0, 0, 0, 0, // First Sequence + 0, 0, 0, 1, // Number of Records + 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101, + 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0, + }, + oldGoEncoded: []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, // First Offset + 0, 0, 0, 94, // Length + 0, 0, 0, 0, // Partition Leader Epoch + 2, // Version + 0, 216, 14, 210, // CRC + 0, 1, // Attributes + 0, 0, 0, 0, // Last Offset Delta + 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID + 0, 0, // Producer Epoch + 0, 0, 0, 0, // First Sequence + 0, 0, 0, 1, // Number of Records + 31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101, + 99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0, + }, + }, + { + name: "snappy compressed record", + batch: RecordBatch{ + Version: 2, + Codec: CompressionSnappy, + FirstTimestamp: time.Unix(1479847795, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{{ + TimestampDelta: 5 * time.Millisecond, + Key: []byte{1, 2, 3, 4}, + Value: []byte{5, 6, 7}, + Headers: []*RecordHeader{{ + Key: []byte{8, 9, 10}, + Value: []byte{11, 12}, + }}, + }}, + recordsLen: 21, + }, + encoded: []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, // First Offset + 0, 0, 0, 72, // Length + 0, 0, 0, 0, // Partition Leader Epoch + 2, // Version + 21, 0, 159, 97, // CRC + 0, 2, // Attributes + 0, 0, 0, 0, // Last Offset Delta + 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID + 0, 0, // Producer Epoch + 0, 0, 0, 0, // First Sequence + 0, 0, 0, 1, // Number of Records + 21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12, + }, + }, + { + name: "lz4 compressed record", + batch: RecordBatch{ + Version: 2, + Codec: CompressionLZ4, + FirstTimestamp: time.Unix(1479847795, 0), + MaxTimestamp: time.Unix(0, 0), + Records: []*Record{{ + TimestampDelta: 5 * time.Millisecond, + Key: []byte{1, 2, 3, 4}, + Value: []byte{5, 6, 7}, + Headers: []*RecordHeader{{ + Key: []byte{8, 9, 10}, + Value: []byte{11, 12}, + }}, + }}, + recordsLen: 21, + }, + encoded: []byte{ + 0, 0, 0, 0, 0, 0, 0, 0, // First Offset + 0, 0, 0, 89, // Length + 0, 0, 0, 0, // Partition Leader Epoch + 2, // Version + 169, 74, 119, 197, // CRC + 0, 3, // Attributes + 0, 0, 0, 0, // Last Offset Delta + 0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp + 0, 0, 0, 0, 0, 0, 0, 0, // Producer ID + 0, 0, // Producer Epoch + 0, 0, 0, 0, // First Sequence + 0, 0, 0, 1, // Number of Records + 4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, + 6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146, + }, + }, +} + +func isOldGo(t *testing.T) bool { + v := strings.Split(runtime.Version()[2:], ".") + if len(v) < 2 { + t.Logf("Can't parse version: %s", runtime.Version()) + return false + } + maj, err := strconv.Atoi(v[0]) + if err != nil { + t.Logf("Can't parse version: %s", runtime.Version()) + return false + } + min, err := strconv.Atoi(v[1]) + if err != nil { + t.Logf("Can't parse version: %s", runtime.Version()) + return false + } + return maj < 1 || (maj == 1 && min < 8) +} + +func TestRecordBatchEncoding(t *testing.T) { + for _, tc := range recordBatchTestCases { + if tc.oldGoEncoded != nil && isOldGo(t) { + testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded) + } else { + testEncodable(t, tc.name, &tc.batch, tc.encoded) + } + } +} + +func TestRecordBatchDecoding(t *testing.T) { + for _, tc := range recordBatchTestCases { + batch := RecordBatch{} + testDecodable(t, tc.name, &batch, tc.encoded) + for _, r := range batch.Records { + r.length = varintLengthField{} + } + for _, r := range tc.batch.Records { + r.length = varintLengthField{} + } + if !reflect.DeepEqual(batch, tc.batch) { + t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch)) + } + } +} diff --git a/vendor/github.com/Shopify/sarama/records.go b/vendor/github.com/Shopify/sarama/records.go new file mode 100644 index 000000000..2b7953a46 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/records.go @@ -0,0 +1,96 @@ +package sarama + +import "fmt" + +const ( + legacyRecords = iota + defaultRecords +) + +// Records implements a union type containing either a RecordBatch or a legacy MessageSet. +type Records struct { + recordsType int + msgSet *MessageSet + recordBatch *RecordBatch +} + +func newLegacyRecords(msgSet *MessageSet) Records { + return Records{recordsType: legacyRecords, msgSet: msgSet} +} + +func newDefaultRecords(batch *RecordBatch) Records { + return Records{recordsType: defaultRecords, recordBatch: batch} +} + +func (r *Records) encode(pe packetEncoder) error { + switch r.recordsType { + case legacyRecords: + if r.msgSet == nil { + return nil + } + return r.msgSet.encode(pe) + case defaultRecords: + if r.recordBatch == nil { + return nil + } + return r.recordBatch.encode(pe) + } + return fmt.Errorf("unknown records type: %v", r.recordsType) +} + +func (r *Records) decode(pd packetDecoder) error { + switch r.recordsType { + case legacyRecords: + r.msgSet = &MessageSet{} + return r.msgSet.decode(pd) + case defaultRecords: + r.recordBatch = &RecordBatch{} + return r.recordBatch.decode(pd) + } + return fmt.Errorf("unknown records type: %v", r.recordsType) +} + +func (r *Records) numRecords() (int, error) { + switch r.recordsType { + case legacyRecords: + if r.msgSet == nil { + return 0, nil + } + return len(r.msgSet.Messages), nil + case defaultRecords: + if r.recordBatch == nil { + return 0, nil + } + return len(r.recordBatch.Records), nil + } + return 0, fmt.Errorf("unknown records type: %v", r.recordsType) +} + +func (r *Records) isPartial() (bool, error) { + switch r.recordsType { + case legacyRecords: + if r.msgSet == nil { + return false, nil + } + return r.msgSet.PartialTrailingMessage, nil + case defaultRecords: + if r.recordBatch == nil { + return false, nil + } + return r.recordBatch.PartialTrailingRecord, nil + } + return false, fmt.Errorf("unknown records type: %v", r.recordsType) +} + +func (r *Records) isControl() (bool, error) { + switch r.recordsType { + case legacyRecords: + return false, nil + case defaultRecords: + if r.recordBatch == nil { + return false, nil + } + return r.recordBatch.Control, nil + } + return false, fmt.Errorf("unknown records type: %v", r.recordsType) +} diff --git a/vendor/github.com/Shopify/sarama/records_test.go b/vendor/github.com/Shopify/sarama/records_test.go new file mode 100644 index 000000000..ff3e64412 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/records_test.go @@ -0,0 +1,137 @@ +package sarama + +import ( + "bytes" + "reflect" + "testing" +) + +func TestLegacyRecords(t *testing.T) { + set := &MessageSet{ + Messages: []*MessageBlock{ + { + Msg: &Message{ + Version: 1, + }, + }, + }, + } + r := newLegacyRecords(set) + + exp, err := encode(set, nil) + if err != nil { + t.Fatal(err) + } + buf, err := encode(&r, nil) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, exp) { + t.Errorf("Wrong encoding for legacy records, wanted %v, got %v", exp, buf) + } + + set = &MessageSet{} + r = newLegacyRecords(nil) + + err = decode(exp, set) + if err != nil { + t.Fatal(err) + } + err = decode(buf, &r) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(set, r.msgSet) { + t.Errorf("Wrong decoding for legacy records, wanted %#+v, got %#+v", set, r.msgSet) + } + + n, err := r.numRecords() + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Errorf("Wrong number of records, wanted 1, got %d", n) + } + + p, err := r.isPartial() + if err != nil { + t.Fatal(err) + } + if p { + t.Errorf("MessageSet shouldn't have a partial trailing message") + } + + c, err := r.isControl() + if err != nil { + t.Fatal(err) + } + if c { + t.Errorf("MessageSet can't be a control batch") + } +} + +func TestDefaultRecords(t *testing.T) { + batch := &RecordBatch{ + Version: 2, + Records: []*Record{ + { + Value: []byte{1}, + }, + }, + } + + r := newDefaultRecords(batch) + + exp, err := encode(batch, nil) + if err != nil { + t.Fatal(err) + } + buf, err := encode(&r, nil) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, exp) { + t.Errorf("Wrong encoding for default records, wanted %v, got %v", exp, buf) + } + + batch = &RecordBatch{} + r = newDefaultRecords(nil) + + err = decode(exp, batch) + if err != nil { + t.Fatal(err) + } + err = decode(buf, &r) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(batch, r.recordBatch) { + t.Errorf("Wrong decoding for default records, wanted %#+v, got %#+v", batch, r.recordBatch) + } + + n, err := r.numRecords() + if err != nil { + t.Fatal(err) + } + if n != 1 { + t.Errorf("Wrong number of records, wanted 1, got %d", n) + } + + p, err := r.isPartial() + if err != nil { + t.Fatal(err) + } + if p { + t.Errorf("RecordBatch shouldn't have a partial trailing record") + } + + c, err := r.isControl() + if err != nil { + t.Fatal(err) + } + if c { + t.Errorf("RecordBatch shouldn't be a control batch") + } +} diff --git a/vendor/github.com/Shopify/sarama/timestamp.go b/vendor/github.com/Shopify/sarama/timestamp.go new file mode 100644 index 000000000..372278d0b --- /dev/null +++ b/vendor/github.com/Shopify/sarama/timestamp.go @@ -0,0 +1,40 @@ +package sarama + +import ( + "fmt" + "time" +) + +type Timestamp struct { + *time.Time +} + +func (t Timestamp) encode(pe packetEncoder) error { + timestamp := int64(-1) + + if !t.Before(time.Unix(0, 0)) { + timestamp = t.UnixNano() / int64(time.Millisecond) + } else if !t.IsZero() { + return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", t)} + } + + pe.putInt64(timestamp) + return nil +} + +func (t Timestamp) decode(pd packetDecoder) error { + millis, err := pd.getInt64() + if err != nil { + return err + } + + // negative timestamps are invalid, in these cases we should return + // a zero time + timestamp := time.Time{} + if millis >= 0 { + timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond)) + } + + *t.Time = timestamp + return nil +} diff --git a/vendor/github.com/Shopify/sarama/utils.go b/vendor/github.com/Shopify/sarama/utils.go index dc0e7e947..df60556fd 100644 --- a/vendor/github.com/Shopify/sarama/utils.go +++ b/vendor/github.com/Shopify/sarama/utils.go @@ -146,5 +146,6 @@ var ( V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) + V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) minVersion = V0_8_2_0 ) diff --git a/vendor/github.com/apache/thrift/CONTRIBUTING.md b/vendor/github.com/apache/thrift/CONTRIBUTING.md index 6268dcf0d..04603592b 100644 --- a/vendor/github.com/apache/thrift/CONTRIBUTING.md +++ b/vendor/github.com/apache/thrift/CONTRIBUTING.md @@ -6,6 +6,14 @@ Thank you for your interest in contributing to the Apache Thrift project! Infor * [Get involved!](http://www.apache.org/foundation/getinvolved.html) * [Legal aspects on Submission of Contributions (Patches)](http://www.apache.org/licenses/LICENSE-2.0.html#contributions) +## If you want to build the project locally ## + +For Windows systems, see our detailed instructions on the [CMake README](/build/cmake/README.md). + +For Windows Native C++ builds, see our detailed instructions on the [WinCPP README](/build/wincpp/README.md). + +For unix systems, see our detailed instructions on the [Docker README](/build/docker/README.md). + ## If you want to review open issues... ## 1. Review the [GitHub Pull Request Backlog](https://github.com/apache/thrift/pulls). Code reviews are open to all. diff --git a/vendor/github.com/apache/thrift/README.md b/vendor/github.com/apache/thrift/README.md index 423068150..2ef9a5ea0 100644 --- a/vendor/github.com/apache/thrift/README.md +++ b/vendor/github.com/apache/thrift/README.md @@ -1,7 +1,7 @@ Apache Thrift ============= -Last Modified: 2014-03-16 +Last Modified: 2017--10 License ======= @@ -171,3 +171,8 @@ To run the cross-language test suite, please run: This will run a set of tests that use different language clients and servers. +Development +=========== + +To build the same way Travis CI builds the project you should use docker. +We have [comprehensive building instructions for docker](build/docker/README.md). diff --git a/vendor/github.com/asaskevich/govalidator/CONTRIBUTING.md b/vendor/github.com/asaskevich/govalidator/CONTRIBUTING.md new file mode 100644 index 000000000..57e723846 --- /dev/null +++ b/vendor/github.com/asaskevich/govalidator/CONTRIBUTING.md @@ -0,0 +1,26 @@ +#### Support +If you do have a contribution to the package, feel free to create a Pull Request or an Issue. + +#### What to contribute +If you don't know what to do, there are some features and functions that need to be done + +- [ ] Refactor code +- [ ] Edit docs and [README](https://github.com/asaskevich/govalidator/README.md): spellcheck, grammar and typo check +- [ ] Create actual list of contributors and projects that currently using this package +- [ ] Resolve [issues and bugs](https://github.com/asaskevich/govalidator/issues) +- [ ] Update actual [list of functions](https://github.com/asaskevich/govalidator#list-of-functions) +- [ ] Update [list of validators](https://github.com/asaskevich/govalidator#validatestruct-2) that available for `ValidateStruct` and add new +- [ ] Implement new validators: `IsFQDN`, `IsIMEI`, `IsPostalCode`, `IsISIN`, `IsISRC` etc +- [ ] Implement [validation by maps](https://github.com/asaskevich/govalidator/issues/224) +- [ ] Implement fuzzing testing +- [ ] Implement some struct/map/array utilities +- [ ] Implement map/array validation +- [ ] Implement benchmarking +- [ ] Implement batch of examples +- [ ] Look at forks for new features and fixes + +#### Advice +Feel free to create what you want, but keep in mind when you implement new features: +- Code must be clear and readable, names of variables/constants clearly describes what they are doing +- Public functions must be documented and described in source file and added to README.md to the list of available functions +- There are must be unit-tests for any new functions and improvements
\ No newline at end of file diff --git a/vendor/github.com/asaskevich/govalidator/README.md b/vendor/github.com/asaskevich/govalidator/README.md index 9d2e1357b..223880940 100644 --- a/vendor/github.com/asaskevich/govalidator/README.md +++ b/vendor/github.com/asaskevich/govalidator/README.md @@ -156,6 +156,7 @@ func IsPort(str string) bool func IsPositive(value float64) bool func IsPrintableASCII(str string) bool func IsRFC3339(str string) bool +func IsRFC3339WithoutZone(str string) bool func IsRGBcolor(str string) bool func IsRequestURI(rawurl string) bool func IsRequestURL(rawurl string) bool @@ -269,56 +270,57 @@ For completely custom validators (interface-based), see below. Here is a list of available validators for struct fields (validator - used function): ```go -"email": IsEmail, -"url": IsURL, -"dialstring": IsDialString, -"requrl": IsRequestURL, -"requri": IsRequestURI, -"alpha": IsAlpha, -"utfletter": IsUTFLetter, -"alphanum": IsAlphanumeric, -"utfletternum": IsUTFLetterNumeric, -"numeric": IsNumeric, -"utfnumeric": IsUTFNumeric, -"utfdigit": IsUTFDigit, -"hexadecimal": IsHexadecimal, -"hexcolor": IsHexcolor, -"rgbcolor": IsRGBcolor, -"lowercase": IsLowerCase, -"uppercase": IsUpperCase, -"int": IsInt, -"float": IsFloat, -"null": IsNull, -"uuid": IsUUID, -"uuidv3": IsUUIDv3, -"uuidv4": IsUUIDv4, -"uuidv5": IsUUIDv5, -"creditcard": IsCreditCard, -"isbn10": IsISBN10, -"isbn13": IsISBN13, -"json": IsJSON, -"multibyte": IsMultibyte, -"ascii": IsASCII, -"printableascii": IsPrintableASCII, -"fullwidth": IsFullWidth, -"halfwidth": IsHalfWidth, -"variablewidth": IsVariableWidth, -"base64": IsBase64, -"datauri": IsDataURI, -"ip": IsIP, -"port": IsPort, -"ipv4": IsIPv4, -"ipv6": IsIPv6, -"dns": IsDNSName, -"host": IsHost, -"mac": IsMAC, -"latitude": IsLatitude, -"longitude": IsLongitude, -"ssn": IsSSN, -"semver": IsSemver, -"rfc3339": IsRFC3339, -"ISO3166Alpha2": IsISO3166Alpha2, -"ISO3166Alpha3": IsISO3166Alpha3, +"email": IsEmail, +"url": IsURL, +"dialstring": IsDialString, +"requrl": IsRequestURL, +"requri": IsRequestURI, +"alpha": IsAlpha, +"utfletter": IsUTFLetter, +"alphanum": IsAlphanumeric, +"utfletternum": IsUTFLetterNumeric, +"numeric": IsNumeric, +"utfnumeric": IsUTFNumeric, +"utfdigit": IsUTFDigit, +"hexadecimal": IsHexadecimal, +"hexcolor": IsHexcolor, +"rgbcolor": IsRGBcolor, +"lowercase": IsLowerCase, +"uppercase": IsUpperCase, +"int": IsInt, +"float": IsFloat, +"null": IsNull, +"uuid": IsUUID, +"uuidv3": IsUUIDv3, +"uuidv4": IsUUIDv4, +"uuidv5": IsUUIDv5, +"creditcard": IsCreditCard, +"isbn10": IsISBN10, +"isbn13": IsISBN13, +"json": IsJSON, +"multibyte": IsMultibyte, +"ascii": IsASCII, +"printableascii": IsPrintableASCII, +"fullwidth": IsFullWidth, +"halfwidth": IsHalfWidth, +"variablewidth": IsVariableWidth, +"base64": IsBase64, +"datauri": IsDataURI, +"ip": IsIP, +"port": IsPort, +"ipv4": IsIPv4, +"ipv6": IsIPv6, +"dns": IsDNSName, +"host": IsHost, +"mac": IsMAC, +"latitude": IsLatitude, +"longitude": IsLongitude, +"ssn": IsSSN, +"semver": IsSemver, +"rfc3339": IsRFC3339, +"rfc3339WithoutZone": IsRFC3339WithoutZone, +"ISO3166Alpha2": IsISO3166Alpha2, +"ISO3166Alpha3": IsISO3166Alpha3, ``` Validators with parameters @@ -409,7 +411,31 @@ Documentation is available here: [godoc.org](https://godoc.org/github.com/asaske Full information about code coverage is also available here: [govalidator on gocover.io](http://gocover.io/github.com/asaskevich/govalidator). #### Support -If you do have a contribution for the package feel free to put up a Pull Request or open Issue. +If you do have a contribution to the package, feel free to create a Pull Request or an Issue. + +#### What to contribute +If you don't know what to do, there are some features and functions that need to be done + +- [ ] Refactor code +- [ ] Edit docs and [README](https://github.com/asaskevich/govalidator/README.md): spellcheck, grammar and typo check +- [ ] Create actual list of contributors and projects that currently using this package +- [ ] Resolve [issues and bugs](https://github.com/asaskevich/govalidator/issues) +- [ ] Update actual [list of functions](https://github.com/asaskevich/govalidator#list-of-functions) +- [ ] Update [list of validators](https://github.com/asaskevich/govalidator#validatestruct-2) that available for `ValidateStruct` and add new +- [ ] Implement new validators: `IsFQDN`, `IsIMEI`, `IsPostalCode`, `IsISIN`, `IsISRC` etc +- [ ] Implement [validation by maps](https://github.com/asaskevich/govalidator/issues/224) +- [ ] Implement fuzzing testing +- [ ] Implement some struct/map/array utilities +- [ ] Implement map/array validation +- [ ] Implement benchmarking +- [ ] Implement batch of examples +- [ ] Look at forks for new features and fixes + +#### Advice +Feel free to create what you want, but keep in mind when you implement new features: +- Code must be clear and readable, names of variables/constants clearly describes what they are doing +- Public functions must be documented and described in source file and added to README.md to the list of available functions +- There are must be unit-tests for any new functions and improvements #### Special thanks to [contributors](https://github.com/asaskevich/govalidator/graphs/contributors) * [Daniel Lohse](https://github.com/annismckenzie) diff --git a/vendor/github.com/asaskevich/govalidator/error.go b/vendor/github.com/asaskevich/govalidator/error.go index 280b1c455..b9c32079b 100644 --- a/vendor/github.com/asaskevich/govalidator/error.go +++ b/vendor/github.com/asaskevich/govalidator/error.go @@ -1,5 +1,7 @@ package govalidator +import "strings" + // Errors is an array of multiple errors and conforms to the error interface. type Errors []error @@ -9,11 +11,11 @@ func (es Errors) Errors() []error { } func (es Errors) Error() string { - var err string + var errs []string for _, e := range es { - err += e.Error() + ";" + errs = append(errs, e.Error()) } - return err + return strings.Join(errs, ";") } // Error encapsulates a name, an error and whether there's a custom error message or not. @@ -21,6 +23,9 @@ type Error struct { Name string Err error CustomErrorMessageExists bool + + // Validator indicates the name of the validator that failed + Validator string } func (e Error) Error() string { diff --git a/vendor/github.com/asaskevich/govalidator/error_test.go b/vendor/github.com/asaskevich/govalidator/error_test.go index 274cc0dfa..e673f2824 100644 --- a/vendor/github.com/asaskevich/govalidator/error_test.go +++ b/vendor/github.com/asaskevich/govalidator/error_test.go @@ -15,10 +15,10 @@ func TestErrorsToString(t *testing.T) { expected string }{ {Errors{}, ""}, - {Errors{fmt.Errorf("Error 1")}, "Error 1;"}, - {Errors{fmt.Errorf("Error 1"), fmt.Errorf("Error 2")}, "Error 1;Error 2;"}, - {Errors{customErr, fmt.Errorf("Error 2")}, "Custom Error Name: stdlib error;Error 2;"}, - {Errors{fmt.Errorf("Error 123"), customErrWithCustomErrorMessage}, "Error 123;Bad stuff happened;"}, + {Errors{fmt.Errorf("Error 1")}, "Error 1"}, + {Errors{fmt.Errorf("Error 1"), fmt.Errorf("Error 2")}, "Error 1;Error 2"}, + {Errors{customErr, fmt.Errorf("Error 2")}, "Custom Error Name: stdlib error;Error 2"}, + {Errors{fmt.Errorf("Error 123"), customErrWithCustomErrorMessage}, "Error 123;Bad stuff happened"}, } for _, test := range tests { actual := test.param1.Error() diff --git a/vendor/github.com/asaskevich/govalidator/numerics.go b/vendor/github.com/asaskevich/govalidator/numerics.go index 5be281f24..d0140d421 100644 --- a/vendor/github.com/asaskevich/govalidator/numerics.go +++ b/vendor/github.com/asaskevich/govalidator/numerics.go @@ -1,6 +1,9 @@ package govalidator -import "math" +import ( + "math" + "reflect" +) // Abs returns absolute value of number func Abs(value float64) float64 { @@ -39,13 +42,47 @@ func IsNonPositive(value float64) bool { } // InRange returns true if value lies between left and right border -func InRange(value, left, right float64) bool { +func InRangeInt(value, left, right int) bool { if left > right { left, right = right, left } return value >= left && value <= right } +// InRange returns true if value lies between left and right border +func InRangeFloat32(value, left, right float32) bool { + if left > right { + left, right = right, left + } + return value >= left && value <= right +} + +// InRange returns true if value lies between left and right border +func InRangeFloat64(value, left, right float64) bool { + if left > right { + left, right = right, left + } + return value >= left && value <= right +} + +// InRange returns true if value lies between left and right border, generic type to handle int, float32 or float64, all types must the same type +func InRange(value interface{}, left interface{}, right interface{}) bool { + + reflectValue := reflect.TypeOf(value).Kind() + reflectLeft := reflect.TypeOf(left).Kind() + reflectRight := reflect.TypeOf(right).Kind() + + if reflectValue == reflect.Int && reflectLeft == reflect.Int && reflectRight == reflect.Int { + return InRangeInt(value.(int), left.(int), right.(int)) + } else if reflectValue == reflect.Float32 && reflectLeft == reflect.Float32 && reflectRight == reflect.Float32 { + return InRangeFloat32(value.(float32), left.(float32), right.(float32)) + } else if reflectValue == reflect.Float64 && reflectLeft == reflect.Float64 && reflectRight == reflect.Float64 { + return InRangeFloat64(value.(float64), left.(float64), right.(float64)) + } else { + return false + } +} + // IsWhole returns true if value is whole number func IsWhole(value float64) bool { return math.Remainder(value, 1) == 0 diff --git a/vendor/github.com/asaskevich/govalidator/numerics_test.go b/vendor/github.com/asaskevich/govalidator/numerics_test.go index 1bad52107..ca743dfed 100644 --- a/vendor/github.com/asaskevich/govalidator/numerics_test.go +++ b/vendor/github.com/asaskevich/govalidator/numerics_test.go @@ -177,7 +177,60 @@ func TestIsNatural(t *testing.T) { } } } -func TestInRange(t *testing.T) { + +func TestInRangeInt(t *testing.T) { + t.Parallel() + + var tests = []struct { + param int + left int + right int + expected bool + }{ + {0, 0, 0, true}, + {1, 0, 0, false}, + {-1, 0, 0, false}, + {0, -1, 1, true}, + {0, 0, 1, true}, + {0, -1, 0, true}, + {0, 0, -1, true}, + {0, 10, 5, false}, + } + for _, test := range tests { + actual := InRangeInt(test.param, test.left, test.right) + if actual != test.expected { + t.Errorf("Expected InRangeInt(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual) + } + } +} + +func TestInRangeFloat32(t *testing.T) { + t.Parallel() + + var tests = []struct { + param float32 + left float32 + right float32 + expected bool + }{ + {0, 0, 0, true}, + {1, 0, 0, false}, + {-1, 0, 0, false}, + {0, -1, 1, true}, + {0, 0, 1, true}, + {0, -1, 0, true}, + {0, 0, -1, true}, + {0, 10, 5, false}, + } + for _, test := range tests { + actual := InRangeFloat32(test.param, test.left, test.right) + if actual != test.expected { + t.Errorf("Expected InRangeFloat32(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual) + } + } +} + +func TestInRangeFloat64(t *testing.T) { t.Parallel() var tests = []struct { @@ -196,6 +249,98 @@ func TestInRange(t *testing.T) { {0, 10, 5, false}, } for _, test := range tests { + actual := InRangeFloat64(test.param, test.left, test.right) + if actual != test.expected { + t.Errorf("Expected InRangeFloat64(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual) + } + } +} + +func TestInRange(t *testing.T) { + t.Parallel() + + var testsInt = []struct { + param int + left int + right int + expected bool + }{ + {0, 0, 0, true}, + {1, 0, 0, false}, + {-1, 0, 0, false}, + {0, -1, 1, true}, + {0, 0, 1, true}, + {0, -1, 0, true}, + {0, 0, -1, true}, + {0, 10, 5, false}, + } + for _, test := range testsInt { + actual := InRange(test.param, test.left, test.right) + if actual != test.expected { + t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual) + } + } + + var testsFloat32 = []struct { + param float32 + left float32 + right float32 + expected bool + }{ + {0, 0, 0, true}, + {1, 0, 0, false}, + {-1, 0, 0, false}, + {0, -1, 1, true}, + {0, 0, 1, true}, + {0, -1, 0, true}, + {0, 0, -1, true}, + {0, 10, 5, false}, + } + for _, test := range testsFloat32 { + actual := InRange(test.param, test.left, test.right) + if actual != test.expected { + t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual) + } + } + + var testsFloat64 = []struct { + param float64 + left float64 + right float64 + expected bool + }{ + {0, 0, 0, true}, + {1, 0, 0, false}, + {-1, 0, 0, false}, + {0, -1, 1, true}, + {0, 0, 1, true}, + {0, -1, 0, true}, + {0, 0, -1, true}, + {0, 10, 5, false}, + } + for _, test := range testsFloat64 { + actual := InRange(test.param, test.left, test.right) + if actual != test.expected { + t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual) + } + } + + var testsTypeMix = []struct { + param int + left float64 + right float64 + expected bool + }{ + {0, 0, 0, false}, + {1, 0, 0, false}, + {-1, 0, 0, false}, + {0, -1, 1, false}, + {0, 0, 1, false}, + {0, -1, 0, false}, + {0, 0, -1, false}, + {0, 10, 5, false}, + } + for _, test := range testsTypeMix { actual := InRange(test.param, test.left, test.right) if actual != test.expected { t.Errorf("Expected InRange(%v, %v, %v) to be %v, got %v", test.param, test.left, test.right, test.expected, actual) diff --git a/vendor/github.com/asaskevich/govalidator/patterns.go b/vendor/github.com/asaskevich/govalidator/patterns.go index 529759559..4a34e2240 100644 --- a/vendor/github.com/asaskevich/govalidator/patterns.go +++ b/vendor/github.com/asaskevich/govalidator/patterns.go @@ -33,7 +33,6 @@ const ( IP string = `(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))` URLSchema string = `((ftp|tcp|udp|wss?|https?):\/\/)` URLUsername string = `(\S+(:\S*)?@)` - Hostname string = `` URLPath string = `((\/|\?|#)[^\s]*)` URLPort string = `(:(\d{1,5}))` URLIP string = `([1-9]\d?|1\d\d|2[01]\d|22[0-3])(\.(1?\d{1,2}|2[0-4]\d|25[0-5])){2}(?:\.([0-9]\d?|1\d\d|2[0-4]\d|25[0-4]))` diff --git a/vendor/github.com/asaskevich/govalidator/types.go b/vendor/github.com/asaskevich/govalidator/types.go index 9a5207c58..ddd30b122 100644 --- a/vendor/github.com/asaskevich/govalidator/types.go +++ b/vendor/github.com/asaskevich/govalidator/types.go @@ -34,6 +34,7 @@ var ParamTagMap = map[string]ParamValidator{ "stringlength": StringLength, "matches": StringMatches, "in": isInRaw, + "rsapub": IsRsaPub, } // ParamTagRegexMap maps param tags to their respective regexes. @@ -44,6 +45,7 @@ var ParamTagRegexMap = map[string]*regexp.Regexp{ "stringlength": regexp.MustCompile("^stringlength\\((\\d+)\\|(\\d+)\\)$"), "in": regexp.MustCompile(`^in\((.*)\)`), "matches": regexp.MustCompile(`^matches\((.+)\)$`), + "rsapub": regexp.MustCompile("^rsapub\\((\\d+)\\)$"), } type customTypeTagMap struct { @@ -72,57 +74,58 @@ var CustomTypeTagMap = &customTypeTagMap{validators: make(map[string]CustomTypeV // TagMap is a map of functions, that can be used as tags for ValidateStruct function. var TagMap = map[string]Validator{ - "email": IsEmail, - "url": IsURL, - "dialstring": IsDialString, - "requrl": IsRequestURL, - "requri": IsRequestURI, - "alpha": IsAlpha, - "utfletter": IsUTFLetter, - "alphanum": IsAlphanumeric, - "utfletternum": IsUTFLetterNumeric, - "numeric": IsNumeric, - "utfnumeric": IsUTFNumeric, - "utfdigit": IsUTFDigit, - "hexadecimal": IsHexadecimal, - "hexcolor": IsHexcolor, - "rgbcolor": IsRGBcolor, - "lowercase": IsLowerCase, - "uppercase": IsUpperCase, - "int": IsInt, - "float": IsFloat, - "null": IsNull, - "uuid": IsUUID, - "uuidv3": IsUUIDv3, - "uuidv4": IsUUIDv4, - "uuidv5": IsUUIDv5, - "creditcard": IsCreditCard, - "isbn10": IsISBN10, - "isbn13": IsISBN13, - "json": IsJSON, - "multibyte": IsMultibyte, - "ascii": IsASCII, - "printableascii": IsPrintableASCII, - "fullwidth": IsFullWidth, - "halfwidth": IsHalfWidth, - "variablewidth": IsVariableWidth, - "base64": IsBase64, - "datauri": IsDataURI, - "ip": IsIP, - "port": IsPort, - "ipv4": IsIPv4, - "ipv6": IsIPv6, - "dns": IsDNSName, - "host": IsHost, - "mac": IsMAC, - "latitude": IsLatitude, - "longitude": IsLongitude, - "ssn": IsSSN, - "semver": IsSemver, - "rfc3339": IsRFC3339, - "ISO3166Alpha2": IsISO3166Alpha2, - "ISO3166Alpha3": IsISO3166Alpha3, - "ISO4217": IsISO4217, + "email": IsEmail, + "url": IsURL, + "dialstring": IsDialString, + "requrl": IsRequestURL, + "requri": IsRequestURI, + "alpha": IsAlpha, + "utfletter": IsUTFLetter, + "alphanum": IsAlphanumeric, + "utfletternum": IsUTFLetterNumeric, + "numeric": IsNumeric, + "utfnumeric": IsUTFNumeric, + "utfdigit": IsUTFDigit, + "hexadecimal": IsHexadecimal, + "hexcolor": IsHexcolor, + "rgbcolor": IsRGBcolor, + "lowercase": IsLowerCase, + "uppercase": IsUpperCase, + "int": IsInt, + "float": IsFloat, + "null": IsNull, + "uuid": IsUUID, + "uuidv3": IsUUIDv3, + "uuidv4": IsUUIDv4, + "uuidv5": IsUUIDv5, + "creditcard": IsCreditCard, + "isbn10": IsISBN10, + "isbn13": IsISBN13, + "json": IsJSON, + "multibyte": IsMultibyte, + "ascii": IsASCII, + "printableascii": IsPrintableASCII, + "fullwidth": IsFullWidth, + "halfwidth": IsHalfWidth, + "variablewidth": IsVariableWidth, + "base64": IsBase64, + "datauri": IsDataURI, + "ip": IsIP, + "port": IsPort, + "ipv4": IsIPv4, + "ipv6": IsIPv6, + "dns": IsDNSName, + "host": IsHost, + "mac": IsMAC, + "latitude": IsLatitude, + "longitude": IsLongitude, + "ssn": IsSSN, + "semver": IsSemver, + "rfc3339": IsRFC3339, + "rfc3339WithoutZone": IsRFC3339WithoutZone, + "ISO3166Alpha2": IsISO3166Alpha2, + "ISO3166Alpha3": IsISO3166Alpha3, + "ISO4217": IsISO4217, } // ISO3166Entry stores country codes diff --git a/vendor/github.com/asaskevich/govalidator/utils.go b/vendor/github.com/asaskevich/govalidator/utils.go index 888c12751..78ed3fbab 100644 --- a/vendor/github.com/asaskevich/govalidator/utils.go +++ b/vendor/github.com/asaskevich/govalidator/utils.go @@ -108,7 +108,7 @@ func CamelCaseToUnderscore(str string) string { var output []rune var segment []rune for _, r := range str { - if !unicode.IsLower(r) { + if !unicode.IsLower(r) && string(r) != "_" { output = addSegment(output, segment) segment = nil } diff --git a/vendor/github.com/asaskevich/govalidator/utils_test.go b/vendor/github.com/asaskevich/govalidator/utils_test.go index 154f31594..5ad8faeb4 100644 --- a/vendor/github.com/asaskevich/govalidator/utils_test.go +++ b/vendor/github.com/asaskevich/govalidator/utils_test.go @@ -269,6 +269,7 @@ func TestCamelCaseToUnderscore(t *testing.T) { {"MyFunc", "my_func"}, {"ABC", "a_b_c"}, {"1B", "1_b"}, + {"foo_bar", "foo_bar"}, } for _, test := range tests { actual := CamelCaseToUnderscore(test.param) diff --git a/vendor/github.com/asaskevich/govalidator/validator.go b/vendor/github.com/asaskevich/govalidator/validator.go index b699e4449..7c158c5f8 100644 --- a/vendor/github.com/asaskevich/govalidator/validator.go +++ b/vendor/github.com/asaskevich/govalidator/validator.go @@ -2,8 +2,14 @@ package govalidator import ( + "bytes" + "crypto/rsa" + "crypto/x509" + "encoding/base64" "encoding/json" + "encoding/pem" "fmt" + "io/ioutil" "net" "net/url" "reflect" @@ -20,10 +26,12 @@ var ( fieldsRequiredByDefault bool notNumberRegexp = regexp.MustCompile("[^0-9]+") whiteSpacesAndMinus = regexp.MustCompile("[\\s-]+") + paramsRegexp = regexp.MustCompile("\\(.*\\)$") ) const maxURLRuneCount = 2083 const minURLRuneCount = 3 +const RF3339WithoutZone = "2006-01-02T15:04:05" // SetFieldsRequiredByDefault causes validation to fail when struct fields // do not include validations or are not explicitly marked as exempt (using `valid:"-"` or `valid:"email,optional"`). @@ -54,7 +62,13 @@ func IsURL(str string) bool { if str == "" || utf8.RuneCountInString(str) >= maxURLRuneCount || len(str) <= minURLRuneCount || strings.HasPrefix(str, ".") { return false } - u, err := url.Parse(str) + strTemp := str + if strings.Index(str, ":") >= 0 && strings.Index(str, "://") == -1 { + // support no indicated urlscheme but with colon for port number + // http:// is appended so url.Parse will succeed, strTemp used so it does not impact rxURL.MatchString + strTemp = "http://" + str + } + u, err := url.Parse(strTemp) if err != nil { return false } @@ -65,7 +79,6 @@ func IsURL(str string) bool { return false } return rxURL.MatchString(str) - } // IsRequestURL check if the string rawurl, assuming @@ -486,6 +499,33 @@ func IsDNSName(str string) bool { return !IsIP(str) && rxDNSName.MatchString(str) } +// IsHash checks if a string is a hash of type algorithm. +// Algorithm is one of ['md4', 'md5', 'sha1', 'sha256', 'sha384', 'sha512', 'ripemd128', 'ripemd160', 'tiger128', 'tiger160', 'tiger192', 'crc32', 'crc32b'] +func IsHash(str string, algorithm string) bool { + len := "0" + algo := strings.ToLower(algorithm) + + if algo == "crc32" || algo == "crc32b" { + len = "8" + } else if algo == "md5" || algo == "md4" || algo == "ripemd128" || algo == "tiger128" { + len = "32" + } else if algo == "sha1" || algo == "ripemd160" || algo == "tiger160" { + len = "40" + } else if algo == "tiger192" { + len = "48" + } else if algo == "sha256" { + len = "64" + } else if algo == "sha384" { + len = "96" + } else if algo == "sha512" { + len = "128" + } else { + return false + } + + return Matches(str, "^[a-f0-9]{" + len + "}$") +} + // IsDialString validates the given string for usage with the various Dial() functions func IsDialString(str string) bool { @@ -560,6 +600,40 @@ func IsLongitude(str string) bool { return rxLongitude.MatchString(str) } +// IsRsaPublicKey check if a string is valid public key with provided length +func IsRsaPublicKey(str string, keylen int) bool { + bb := bytes.NewBufferString(str) + pemBytes, err := ioutil.ReadAll(bb) + if err != nil { + return false + } + block, _ := pem.Decode(pemBytes) + if block != nil && block.Type != "PUBLIC KEY" { + return false + } + var der []byte + + if block != nil { + der = block.Bytes + } else { + der, err = base64.StdEncoding.DecodeString(str) + if err != nil { + return false + } + } + + key, err := x509.ParsePKIXPublicKey(der) + if err != nil { + return false + } + pubkey, ok := key.(*rsa.PublicKey) + if !ok { + return false + } + bitlen := len(pubkey.N.Bytes()) * 8 + return bitlen == int(keylen) +} + func toJSONName(tag string) string { if tag == "" { return "" @@ -568,7 +642,16 @@ func toJSONName(tag string) string { // JSON name always comes first. If there's no options then split[0] is // JSON name, if JSON name is not set, then split[0] is an empty string. split := strings.SplitN(tag, ",", 2) - return split[0] + + name := split[0] + + // However it is possible that the field is skipped when + // (de-)serializing from/to JSON, in which case assume that there is no + // tag name to use + if name == "-" { + return "" + } + return name } // ValidateStruct use tags for fields. @@ -613,6 +696,14 @@ func ValidateStruct(s interface{}) (bool, error) { jsonError.Name = jsonTag err2 = jsonError case Errors: + for i2, err3 := range jsonError { + switch customErr := err3.(type) { + case Error: + customErr.Name = jsonTag + jsonError[i2] = customErr + } + } + err2 = jsonError } } @@ -630,8 +721,11 @@ func ValidateStruct(s interface{}) (bool, error) { // parseTagIntoMap parses a struct tag `valid:required~Some error message,length(2|3)` into map[string]string{"required": "Some error message", "length(2|3)": ""} func parseTagIntoMap(tag string) tagOptionsMap { optionsMap := make(tagOptionsMap) - options := strings.SplitN(tag, ",", -1) + options := strings.Split(tag, ",") + for _, option := range options { + option = strings.TrimSpace(option) + validationOptions := strings.Split(option, "~") if !isValidTag(validationOptions[0]) { continue @@ -688,6 +782,11 @@ func IsRFC3339(str string) bool { return IsTime(str, time.RFC3339) } +// IsRFC3339WithoutZone check if string is valid timestamp value according to RFC3339 which excludes the timezone. +func IsRFC3339WithoutZone(str string) bool { + return IsTime(str, RF3339WithoutZone) +} + // IsISO4217 check if string is valid ISO currency code func IsISO4217(str string) bool { for _, currency := range ISO4217List { @@ -716,6 +815,17 @@ func RuneLength(str string, params ...string) bool { return StringLength(str, params...) } +// IsRsaPub check whether string is valid RSA key +// Alias for IsRsaPublicKey +func IsRsaPub(str string, params ...string) bool { + if len(params) == 1 { + len, _ := ToInt(params[0]) + return IsRsaPublicKey(str, int(len)) + } + + return false +} + // StringMatches checks if a string matches a given pattern. func StringMatches(s string, params ...string) bool { if len(params) == 1 { @@ -776,11 +886,11 @@ func IsIn(str string, params ...string) bool { func checkRequired(v reflect.Value, t reflect.StructField, options tagOptionsMap) (bool, error) { if requiredOption, isRequired := options["required"]; isRequired { if len(requiredOption) > 0 { - return false, Error{t.Name, fmt.Errorf(requiredOption), true} + return false, Error{t.Name, fmt.Errorf(requiredOption), true, "required"} } - return false, Error{t.Name, fmt.Errorf("non zero value required"), false} + return false, Error{t.Name, fmt.Errorf("non zero value required"), false, "required"} } else if _, isOptional := options["optional"]; fieldsRequiredByDefault && !isOptional { - return false, Error{t.Name, fmt.Errorf("All fields are required to at least have one validation defined"), false} + return false, Error{t.Name, fmt.Errorf("All fields are required to at least have one validation defined"), false, "required"} } // not required and empty is valid return true, nil @@ -799,7 +909,7 @@ func typeCheck(v reflect.Value, t reflect.StructField, o reflect.Value, options if !fieldsRequiredByDefault { return true, nil } - return false, Error{t.Name, fmt.Errorf("All fields are required to at least have one validation defined"), false} + return false, Error{t.Name, fmt.Errorf("All fields are required to at least have one validation defined"), false, "required"} case "-": return true, nil } @@ -822,10 +932,10 @@ func typeCheck(v reflect.Value, t reflect.StructField, o reflect.Value, options if result := validatefunc(v.Interface(), o.Interface()); !result { if len(customErrorMessage) > 0 { - customTypeErrors = append(customTypeErrors, Error{Name: t.Name, Err: fmt.Errorf(customErrorMessage), CustomErrorMessageExists: true}) + customTypeErrors = append(customTypeErrors, Error{Name: t.Name, Err: fmt.Errorf(customErrorMessage), CustomErrorMessageExists: true, Validator: stripParams(validatorName)}) continue } - customTypeErrors = append(customTypeErrors, Error{Name: t.Name, Err: fmt.Errorf("%s does not validate as %s", fmt.Sprint(v), validatorName), CustomErrorMessageExists: false}) + customTypeErrors = append(customTypeErrors, Error{Name: t.Name, Err: fmt.Errorf("%s does not validate as %s", fmt.Sprint(v), validatorName), CustomErrorMessageExists: false, Validator: stripParams(validatorName)}) } } } @@ -844,7 +954,7 @@ func typeCheck(v reflect.Value, t reflect.StructField, o reflect.Value, options for validator := range options { isValid = false resultErr = Error{t.Name, fmt.Errorf( - "The following validator is invalid or can't be applied to the field: %q", validator), false} + "The following validator is invalid or can't be applied to the field: %q", validator), false, stripParams(validator)} return } } @@ -888,16 +998,16 @@ func typeCheck(v reflect.Value, t reflect.StructField, o reflect.Value, options field := fmt.Sprint(v) // make value into string, then validate with regex if result := validatefunc(field, ps[1:]...); (!result && !negate) || (result && negate) { if customMsgExists { - return false, Error{t.Name, fmt.Errorf(customErrorMessage), customMsgExists} + return false, Error{t.Name, fmt.Errorf(customErrorMessage), customMsgExists, stripParams(validatorSpec)} } if negate { - return false, Error{t.Name, fmt.Errorf("%s does validate as %s", field, validator), customMsgExists} + return false, Error{t.Name, fmt.Errorf("%s does validate as %s", field, validator), customMsgExists, stripParams(validatorSpec)} } - return false, Error{t.Name, fmt.Errorf("%s does not validate as %s", field, validator), customMsgExists} + return false, Error{t.Name, fmt.Errorf("%s does not validate as %s", field, validator), customMsgExists, stripParams(validatorSpec)} } default: // type not yet supported, fail - return false, Error{t.Name, fmt.Errorf("Validator %s doesn't support kind %s", validator, v.Kind()), false} + return false, Error{t.Name, fmt.Errorf("Validator %s doesn't support kind %s", validator, v.Kind()), false, stripParams(validatorSpec)} } } @@ -909,17 +1019,17 @@ func typeCheck(v reflect.Value, t reflect.StructField, o reflect.Value, options field := fmt.Sprint(v) // make value into string, then validate with regex if result := validatefunc(field); !result && !negate || result && negate { if customMsgExists { - return false, Error{t.Name, fmt.Errorf(customErrorMessage), customMsgExists} + return false, Error{t.Name, fmt.Errorf(customErrorMessage), customMsgExists, stripParams(validatorSpec)} } if negate { - return false, Error{t.Name, fmt.Errorf("%s does validate as %s", field, validator), customMsgExists} + return false, Error{t.Name, fmt.Errorf("%s does validate as %s", field, validator), customMsgExists, stripParams(validatorSpec)} } - return false, Error{t.Name, fmt.Errorf("%s does not validate as %s", field, validator), customMsgExists} + return false, Error{t.Name, fmt.Errorf("%s does not validate as %s", field, validator), customMsgExists, stripParams(validatorSpec)} } default: //Not Yet Supported Types (Fail here!) err := fmt.Errorf("Validator %s doesn't support kind %s for value %v", validator, v.Kind(), v) - return false, Error{t.Name, err, false} + return false, Error{t.Name, err, false, stripParams(validatorSpec)} } } } @@ -933,9 +1043,18 @@ func typeCheck(v reflect.Value, t reflect.StructField, o reflect.Value, options sort.Sort(sv) result := true for _, k := range sv { - resultItem, err := ValidateStruct(v.MapIndex(k).Interface()) - if err != nil { - return false, err + var resultItem bool + var err error + if v.MapIndex(k).Kind() != reflect.Struct { + resultItem, err = typeCheck(v.MapIndex(k), t, o, options) + if err != nil { + return false, err + } + } else { + resultItem, err = ValidateStruct(v.MapIndex(k).Interface()) + if err != nil { + return false, err + } } result = result && resultItem } @@ -978,6 +1097,10 @@ func typeCheck(v reflect.Value, t reflect.StructField, o reflect.Value, options } } +func stripParams(validatorString string) string { + return paramsRegexp.ReplaceAllString(validatorString, "") +} + func isEmptyValue(v reflect.Value) bool { switch v.Kind() { case reflect.String, reflect.Array: diff --git a/vendor/github.com/asaskevich/govalidator/validator_test.go b/vendor/github.com/asaskevich/govalidator/validator_test.go index 3537232d5..cf56f7a5a 100644 --- a/vendor/github.com/asaskevich/govalidator/validator_test.go +++ b/vendor/github.com/asaskevich/govalidator/validator_test.go @@ -536,6 +536,37 @@ func TestIsInt(t *testing.T) { } } + +func TestIsHash(t *testing.T) { + t.Parallel() + + var tests = []struct { + param string + algo string + expected bool + }{ + {"3ca25ae354e192b26879f651a51d92aa8a34d8d3", "sha1", true}, + {"3ca25ae354e192b26879f651a51d34d8d3", "sha1", false}, + {"3ca25ae354e192b26879f651a51d92aa8a34d8d3", "Tiger160", true}, + {"3ca25ae354e192b26879f651a51d34d8d3", "ripemd160", false}, + {"579282cfb65ca1f109b78536effaf621b853c9f7079664a3fbe2b519f435898c", "sha256", true}, + {"579282cfb65ca1f109b78536effaf621b853c9f7079664a3fbe2b519f435898casfdsafsadfsdf", "sha256", false}, + {"bf547c3fc5841a377eb1519c2890344dbab15c40ae4150b4b34443d2212e5b04aa9d58865bf03d8ae27840fef430b891", "sha384", true}, + {"579282cfb65ca1f109b78536effaf621b853c9f7079664a3fbe2b519f435898casfdsafsadfsdf", "sha384", false}, + {"45bc5fa8cb45ee408c04b6269e9f1e1c17090c5ce26ffeeda2af097735b29953ce547e40ff3ad0d120e5361cc5f9cee35ea91ecd4077f3f589b4d439168f91b9", "sha512", true}, + {"579282cfb65ca1f109b78536effaf621b853c9f7079664a3fbe2b519f435898casfdsafsadfsdf", "sha512", false}, + {"46fc0125a148788a3ac1d649566fc04eb84a746f1a6e4fa7", "tiger192", true}, + {"46fc0125a148788a3ac1d649566fc04eb84a746f1a6$$%@^", "TIGER192", false}, + {"46fc0125a148788a3ac1d649566fc04eb84a746f1a6$$%@^", "SOMEHASH", false}, + } + for _, test := range tests { + actual := IsHash(test.param, test.algo) + if actual != test.expected { + t.Errorf("Expected IsHash(%q, %q) to be %v, got %v", test.param, test.algo, test.expected, actual) + } + } +} + func TestIsEmail(t *testing.T) { t.Parallel() @@ -633,6 +664,7 @@ func TestIsURL(t *testing.T) { {"https://pbs.twimg.com/profile_images/560826135676588032/j8fWrmYY_normal.jpeg", true}, // according to #125 {"http://prometheus-alertmanager.service.q:9093", true}, + {"aio1_alertmanager_container-63376c45:9093", true}, {"https://www.logn-123-123.url.with.sigle.letter.d:12345/url/path/foo?bar=zzz#user", true}, {"http://me.example.com", true}, {"http://www.me.example.com", true}, @@ -661,6 +693,10 @@ func TestIsURL(t *testing.T) { {"foo_bar.example.com", true}, {"foo_bar_fizz_buzz.example.com", true}, {"http://hello_world.example.com", true}, + // According to #212 + {"foo_bar-fizz-buzz:1313", true}, + {"foo_bar-fizz-buzz:13:13", false}, + {"foo_bar-fizz-buzz://1313", false}, } for _, test := range tests { actual := IsURL(test.param) @@ -980,6 +1016,7 @@ func TestIsMultibyte(t *testing.T) { {"test@example.com", true}, {"1234abcDExyz", true}, {"カタカナ", true}, + {"", true}, } for _, test := range tests { actual := IsMultibyte(test.param) @@ -1850,6 +1887,13 @@ func TestIsTime(t *testing.T) { {"2016-12-31T11:00:00.05Z", time.RFC3339, true}, {"2016-12-31T11:00:00.05-01:00", time.RFC3339, true}, {"2016-12-31T11:00:00.05+01:00", time.RFC3339, true}, + {"2016-12-31T11:00:00", RF3339WithoutZone, true}, + {"2016-12-31T11:00:00Z", RF3339WithoutZone, false}, + {"2016-12-31T11:00:00+01:00", RF3339WithoutZone, false}, + {"2016-12-31T11:00:00-01:00", RF3339WithoutZone, false}, + {"2016-12-31T11:00:00.05Z", RF3339WithoutZone, false}, + {"2016-12-31T11:00:00.05-01:00", RF3339WithoutZone, false}, + {"2016-12-31T11:00:00.05+01:00", RF3339WithoutZone, false}, } for _, test := range tests { actual := IsTime(test.param, test.format) @@ -2162,7 +2206,7 @@ func TestInvalidValidator(t *testing.T) { invalidStruct := InvalidStruct{1} if valid, err := ValidateStruct(&invalidStruct); valid || err == nil || - err.Error() != `Field: The following validator is invalid or can't be applied to the field: "someInvalidValidator";` { + err.Error() != `Field: The following validator is invalid or can't be applied to the field: "someInvalidValidator"` { t.Errorf("Got an unexpected result for struct with invalid validator: %t %s", valid, err) } } @@ -2184,12 +2228,12 @@ func TestCustomValidator(t *testing.T) { t.Errorf("Got an unexpected result for struct with custom always true validator: %t %s", valid, err) } - if valid, err := ValidateStruct(&InvalidStruct{Field: 1}); valid || err == nil || err.Error() != "Custom validator error;;" { + if valid, err := ValidateStruct(&InvalidStruct{Field: 1}); valid || err == nil || err.Error() != "Custom validator error" { t.Errorf("Got an unexpected result for struct with custom always false validator: %t %s", valid, err) } mixedStruct := StructWithCustomAndBuiltinValidator{} - if valid, err := ValidateStruct(&mixedStruct); valid || err == nil || err.Error() != "Field: non zero value required;" { + if valid, err := ValidateStruct(&mixedStruct); valid || err == nil || err.Error() != "Field: non zero value required" { t.Errorf("Got an unexpected result for invalid struct with custom and built-in validators: %t %s", valid, err) } @@ -2522,6 +2566,8 @@ func TestValidateStruct(t *testing.T) { type testByteArray [8]byte type testByteMap map[byte]byte type testByteSlice []byte +type testStringStringMap map[string]string +type testStringIntMap map[string]int func TestRequired(t *testing.T) { @@ -2606,6 +2652,22 @@ func TestRequired(t *testing.T) { }{}, false, }, + { + struct { + TestStringStringMap testStringStringMap `valid:"required"` + }{ + testStringStringMap{"test": "test"}, + }, + true, + }, + { + struct { + TestIntMap testStringIntMap `valid:"required"` + }{ + testStringIntMap{"test": 42}, + }, + true, + }, } for _, test := range tests { actual, err := ValidateStruct(test.param) @@ -2693,7 +2755,7 @@ func TestErrorsByField(t *testing.T) { {"CustomField", "An error occurred"}, } - err = Error{"CustomField", fmt.Errorf("An error occurred"), false} + err = Error{"CustomField", fmt.Errorf("An error occurred"), false, "hello"} errs = ErrorsByField(err) if len(errs) != 1 { @@ -2838,10 +2900,11 @@ func TestOptionalCustomValidators(t *testing.T) { func TestJSONValidator(t *testing.T) { var val struct { - WithJSONName string `json:"with_json_name" valid:"-,required"` - WithoutJSONName string `valid:"-,required"` - WithJSONOmit string `json:"with_other_json_name,omitempty" valid:"-,required"` - WithJSONOption string `json:",omitempty" valid:"-,required"` + WithJSONName string `json:"with_json_name" valid:"-,required"` + WithoutJSONName string `valid:"-,required"` + WithJSONOmit string `json:"with_other_json_name,omitempty" valid:"-,required"` + WithJSONOption string `json:",omitempty" valid:"-,required"` + WithEmptyJSONName string `json:"-" valid:"-,required"` } _, err := ValidateStruct(val) @@ -2861,4 +2924,117 @@ func TestJSONValidator(t *testing.T) { if Contains(err.Error(), "omitempty") { t.Errorf("Expected error message to not contain ',omitempty' but actual error is: %s", err.Error()) } + + if !Contains(err.Error(), "WithEmptyJSONName") { + t.Errorf("Expected error message to contain WithEmptyJSONName but actual error is: %s", err.Error()) + } +} + +func TestValidatorIncludedInError(t *testing.T) { + post := Post{ + Title: "", + Message: "👍", + AuthorIP: "xyz", + } + + validatorMap := map[string]string{ + "Title": "required", + "Message": "ascii", + "AuthorIP": "ipv4", + } + + ok, errors := ValidateStruct(post) + if ok { + t.Errorf("expected validation to fail %v", ok) + } + + for _, e := range errors.(Errors) { + casted := e.(Error) + if validatorMap[casted.Name] != casted.Validator { + t.Errorf("expected validator for %s to be %s, but was %s", casted.Name, validatorMap[casted.Name], casted.Validator) + } + } + + // check to make sure that validators with arguments (like length(1|10)) don't include the arguments + // in the validator name + message := MessageWithSeveralFieldsStruct{ + Title: "", + Body: "asdfasdfasdfasdfasdf", + } + + validatorMap = map[string]string{ + "Title": "length", + "Body": "length", + } + + ok, errors = ValidateStruct(message) + if ok { + t.Errorf("expected validation to fail, %v", ok) + } + + for _, e := range errors.(Errors) { + casted := e.(Error) + if validatorMap[casted.Name] != casted.Validator { + t.Errorf("expected validator for %s to be %s, but was %s", casted.Name, validatorMap[casted.Name], casted.Validator) + } + } + + // make sure validators with custom messages don't show up in the validator string + type CustomMessage struct { + Text string `valid:"length(1|10)~Custom message"` + } + cs := CustomMessage{Text: "asdfasdfasdfasdf"} + + ok, errors = ValidateStruct(&cs) + if ok { + t.Errorf("expected validation to fail, %v", ok) + } + + validator := errors.(Errors)[0].(Error).Validator + if validator != "length" { + t.Errorf("expected validator for Text to be length, but was %s", validator) + } + +} + +func TestIsRsaPublicKey(t *testing.T) { + var tests = []struct { + rsastr string + keylen int + expected bool + }{ + {`fubar`, 2048, false}, + {`MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvncDCeibmEkabJLmFec7x9y86RP6dIvkVxxbQoOJo06E+p7tH6vCmiGHKnuu +XwKYLq0DKUE3t/HHsNdowfD9+NH8caLzmXqGBx45/Dzxnwqz0qYq7idK+Qff34qrk/YFoU7498U1Ee7PkKb7/VE9BmMEcI3uoKbeXCbJRI +HoTp8bUXOpNTSUfwUNwJzbm2nsHo2xu6virKtAZLTsJFzTUmRd11MrWCvj59lWzt1/eIMN+ekjH8aXeLOOl54CL+kWp48C+V9BchyKCShZ +B7ucimFvjHTtuxziXZQRO7HlcsBOa0WwvDJnRnskdyoD31s4F4jpKEYBJNWTo63v6lUvbQIDAQAB`, 2048, true}, + {`MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvncDCeibmEkabJLmFec7x9y86RP6dIvkVxxbQoOJo06E+p7tH6vCmiGHKnuu +XwKYLq0DKUE3t/HHsNdowfD9+NH8caLzmXqGBx45/Dzxnwqz0qYq7idK+Qff34qrk/YFoU7498U1Ee7PkKb7/VE9BmMEcI3uoKbeXCbJRI +HoTp8bUXOpNTSUfwUNwJzbm2nsHo2xu6virKtAZLTsJFzTUmRd11MrWCvj59lWzt1/eIMN+ekjH8aXeLOOl54CL+kWp48C+V9BchyKCShZ +B7ucimFvjHTtuxziXZQRO7HlcsBOa0WwvDJnRnskdyoD31s4F4jpKEYBJNWTo63v6lUvbQIDAQAB`, 1024, false}, + {`-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvncDCeibmEkabJLmFec7 +x9y86RP6dIvkVxxbQoOJo06E+p7tH6vCmiGHKnuuXwKYLq0DKUE3t/HHsNdowfD9 ++NH8caLzmXqGBx45/Dzxnwqz0qYq7idK+Qff34qrk/YFoU7498U1Ee7PkKb7/VE9 +BmMEcI3uoKbeXCbJRIHoTp8bUXOpNTSUfwUNwJzbm2nsHo2xu6virKtAZLTsJFzT +UmRd11MrWCvj59lWzt1/eIMN+ekjH8aXeLOOl54CL+kWp48C+V9BchyKCShZB7uc +imFvjHTtuxziXZQRO7HlcsBOa0WwvDJnRnskdyoD31s4F4jpKEYBJNWTo63v6lUv +bQIDAQAB +-----END PUBLIC KEY-----`, 2048, true}, + {`-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAvncDCeibmEkabJLmFec7 +x9y86RP6dIvkVxxbQoOJo06E+p7tH6vCmiGHKnuuXwKYLq0DKUE3t/HHsNdowfD9 ++NH8caLzmXqGBx45/Dzxnwqz0qYq7idK+Qff34qrk/YFoU7498U1Ee7PkKb7/VE9 +BmMEcI3uoKbeXCbJRIHoTp8bUXOpNTSUfwUNwJzbm2nsHo2xu6virKtAZLTsJFzT +UmRd11MrWCvj59lWzt1/eIMN+ekjH8aXeLOOl54CL+kWp48C+V9BchyKCShZB7uc +imFvjHTtuxziXZQRO7HlcsBOa0WwvDJnRnskdyoD31s4F4jpKEYBJNWTo63v6lUv +bQIDAQAB +-----END PUBLIC KEY-----`, 4096, false}, + } + for i, test := range tests { + actual := IsRsaPublicKey(test.rsastr, test.keylen) + if actual != test.expected { + t.Errorf("Expected TestIsRsaPublicKey(%d, %d) to be %v, got %v", i, test.keylen, test.expected, actual) + } + } } diff --git a/vendor/github.com/go-openapi/swag/.gitignore b/vendor/github.com/go-openapi/swag/.gitignore index 769c24400..5862205ee 100644 --- a/vendor/github.com/go-openapi/swag/.gitignore +++ b/vendor/github.com/go-openapi/swag/.gitignore @@ -1 +1,3 @@ secrets.yml +vendor +Godeps diff --git a/vendor/github.com/go-openapi/swag/path_test.go b/vendor/github.com/go-openapi/swag/path_test.go index 743bba534..c5a670646 100644 --- a/vendor/github.com/go-openapi/swag/path_test.go +++ b/vendor/github.com/go-openapi/swag/path_test.go @@ -17,8 +17,8 @@ package swag import ( "io/ioutil" "os" - "path/filepath" "path" + "path/filepath" "runtime" "testing" @@ -75,7 +75,7 @@ func TestFindPackage(t *testing.T) { os.RemoveAll(pth2) }() - searchPath := pth + string(filepath.ListSeparator) + pth2 + searchPath := pth + string(filepath.ListSeparator) + pth2 // finds package when real name mentioned pkg := FindInSearchPath(searchPath, "foo/bar") assert.NotEmpty(t, pkg) diff --git a/vendor/github.com/go-openapi/swag/util.go b/vendor/github.com/go-openapi/swag/util.go index 40751aab4..0efb41735 100644 --- a/vendor/github.com/go-openapi/swag/util.go +++ b/vendor/github.com/go-openapi/swag/util.go @@ -40,6 +40,7 @@ var commonInitialisms = map[string]bool{ "IP": true, "JSON": true, "LHS": true, + "OAI": true, "QPS": true, "RAM": true, "RHS": true, @@ -163,8 +164,8 @@ func split(str string) (words []string) { // Split when uppercase is found (needed for Snake) str = rex1.ReplaceAllString(str, " $1") - // check if consecutive single char things make up an initialism + // check if consecutive single char things make up an initialism for _, k := range initialisms { str = strings.Replace(str, rex1.ReplaceAllString(k, " $1"), " "+k, -1) } @@ -189,10 +190,47 @@ func lower(str string) string { return strings.ToLower(trim(str)) } +// Camelize an uppercased word +func Camelize(word string) (camelized string) { + for pos, ru := range word { + if pos > 0 { + camelized += string(unicode.ToLower(ru)) + } else { + camelized += string(unicode.ToUpper(ru)) + } + } + return +} + // ToFileName lowercases and underscores a go type name func ToFileName(name string) string { var out []string - for _, w := range split(name) { + cml := trim(name) + + // Camelize any capital word preceding a reserved keyword ("initialism") + // thus, upper-cased words preceding a common initialism will get separated + // e.g: ELBHTTPLoadBalancer becomes elb_http_load_balancer + rexPrevious := regexp.MustCompile(`(?P<word>\p{Lu}{2,})(?:HTTP|OAI)`) + cml = rexPrevious.ReplaceAllStringFunc(cml, func(match string) (replaceInMatch string) { + for _, m := range rexPrevious.FindAllStringSubmatch(match, -1) { // [ match submatch ] + if m[1] != "" { + replaceInMatch = strings.Replace(m[0], m[1], Camelize(m[1]), -1) + } + } + return + }) + + // Pre-camelize reserved keywords ("initialisms") to avoid unnecessary hyphenization + for _, k := range initialisms { + cml = strings.Replace(cml, k, Camelize(k), -1) + } + + // Camelize other capital words to avoid unnecessary hyphenization + rexCase := regexp.MustCompile(`(\p{Lu}{2,})`) + cml = rexCase.ReplaceAllStringFunc(cml, Camelize) + + // Final split with hyphens + for _, w := range split(cml) { out = append(out, lower(w)) } return strings.Join(out, "_") diff --git a/vendor/github.com/go-openapi/swag/util_test.go b/vendor/github.com/go-openapi/swag/util_test.go index 5db20b90f..3aeb925fa 100644 --- a/vendor/github.com/go-openapi/swag/util_test.go +++ b/vendor/github.com/go-openapi/swag/util_test.go @@ -39,6 +39,7 @@ func TestToGoName(t *testing.T) { {"findThingById", "FindThingByID"}, {"日本語sample 2 Text", "X日本語sample2Text"}, {"日本語findThingById", "X日本語findThingByID"}, + {"findTHINGSbyID", "FindTHINGSbyID"}, } for k := range commonInitialisms { @@ -122,8 +123,16 @@ func TestToFileName(t *testing.T) { samples := []translationSample{ {"SampleText", "sample_text"}, {"FindThingByID", "find_thing_by_id"}, + {"CAPWD.folwdBYlc", "capwd_folwd_bylc"}, + {"CAPWDfolwdBYlc", "capwdfolwd_bylc"}, + {"CAP_WD_folwdBYlc", "cap_wd_folwd_bylc"}, + {"TypeOAI_alias", "type_oai_alias"}, + {"Type_OAI_alias", "type_oai_alias"}, + {"Type_OAIAlias", "type_oai_alias"}, + {"ELB.HTTPLoadBalancer", "elb_http_load_balancer"}, + {"elbHTTPLoadBalancer", "elb_http_load_balancer"}, + {"ELBHTTPLoadBalancer", "elb_http_load_balancer"}, } - for k := range commonInitialisms { samples = append(samples, translationSample{"Sample" + k + "Text", "sample_" + lower(k) + "_text"}, diff --git a/vendor/golang.org/x/sys/unix/file_unix.go b/vendor/golang.org/x/sys/unix/file_unix.go deleted file mode 100644 index 47f6a83f2..000000000 --- a/vendor/golang.org/x/sys/unix/file_unix.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package unix - -import ( - "os" - "syscall" -) - -// FIXME: unexported function from os -// syscallMode returns the syscall-specific mode bits from Go's portable mode bits. -func syscallMode(i os.FileMode) (o uint32) { - o |= uint32(i.Perm()) - if i&os.ModeSetuid != 0 { - o |= syscall.S_ISUID - } - if i&os.ModeSetgid != 0 { - o |= syscall.S_ISGID - } - if i&os.ModeSticky != 0 { - o |= syscall.S_ISVTX - } - // No mapping for Go's ModeTemporary (plan9 only). - return -} diff --git a/vendor/golang.org/x/sys/unix/syscall_linux.go b/vendor/golang.org/x/sys/unix/syscall_linux.go index b98a7e154..9098661a3 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux.go @@ -1125,6 +1125,10 @@ func PtracePokeData(pid int, addr uintptr, data []byte) (count int, err error) { return ptracePoke(PTRACE_POKEDATA, PTRACE_PEEKDATA, pid, addr, data) } +func PtracePokeUser(pid int, addr uintptr, data []byte) (count int, err error) { + return ptracePoke(PTRACE_POKEUSR, PTRACE_PEEKUSR, pid, addr, data) +} + func PtraceGetRegs(pid int, regsout *PtraceRegs) (err error) { return ptrace(PTRACE_GETREGS, pid, 0, uintptr(unsafe.Pointer(regsout))) } diff --git a/vendor/golang.org/x/sys/windows/syscall_windows.go b/vendor/golang.org/x/sys/windows/syscall_windows.go index bb778dbd2..f48fec60d 100644 --- a/vendor/golang.org/x/sys/windows/syscall_windows.go +++ b/vendor/golang.org/x/sys/windows/syscall_windows.go @@ -796,6 +796,75 @@ func ConnectEx(fd Handle, sa Sockaddr, sendBuf *byte, sendDataLen uint32, bytesS return connectEx(fd, ptr, n, sendBuf, sendDataLen, bytesSent, overlapped) } +var sendRecvMsgFunc struct { + once sync.Once + sendAddr uintptr + recvAddr uintptr + err error +} + +func loadWSASendRecvMsg() error { + sendRecvMsgFunc.once.Do(func() { + var s Handle + s, sendRecvMsgFunc.err = Socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) + if sendRecvMsgFunc.err != nil { + return + } + defer CloseHandle(s) + var n uint32 + sendRecvMsgFunc.err = WSAIoctl(s, + SIO_GET_EXTENSION_FUNCTION_POINTER, + (*byte)(unsafe.Pointer(&WSAID_WSARECVMSG)), + uint32(unsafe.Sizeof(WSAID_WSARECVMSG)), + (*byte)(unsafe.Pointer(&sendRecvMsgFunc.recvAddr)), + uint32(unsafe.Sizeof(sendRecvMsgFunc.recvAddr)), + &n, nil, 0) + if sendRecvMsgFunc.err != nil { + return + } + sendRecvMsgFunc.err = WSAIoctl(s, + SIO_GET_EXTENSION_FUNCTION_POINTER, + (*byte)(unsafe.Pointer(&WSAID_WSASENDMSG)), + uint32(unsafe.Sizeof(WSAID_WSASENDMSG)), + (*byte)(unsafe.Pointer(&sendRecvMsgFunc.sendAddr)), + uint32(unsafe.Sizeof(sendRecvMsgFunc.sendAddr)), + &n, nil, 0) + }) + return sendRecvMsgFunc.err +} + +func WSASendMsg(fd Handle, msg *WSAMsg, flags uint32, bytesSent *uint32, overlapped *Overlapped, croutine *byte) error { + err := loadWSASendRecvMsg() + if err != nil { + return err + } + r1, _, e1 := syscall.Syscall6(sendRecvMsgFunc.sendAddr, 6, uintptr(fd), uintptr(unsafe.Pointer(msg)), uintptr(flags), uintptr(unsafe.Pointer(bytesSent)), uintptr(unsafe.Pointer(overlapped)), uintptr(unsafe.Pointer(croutine))) + if r1 == socket_error { + if e1 != 0 { + err = errnoErr(e1) + } else { + err = syscall.EINVAL + } + } + return err +} + +func WSARecvMsg(fd Handle, msg *WSAMsg, bytesReceived *uint32, overlapped *Overlapped, croutine *byte) error { + err := loadWSASendRecvMsg() + if err != nil { + return err + } + r1, _, e1 := syscall.Syscall6(sendRecvMsgFunc.recvAddr, 5, uintptr(fd), uintptr(unsafe.Pointer(msg)), uintptr(unsafe.Pointer(bytesReceived)), uintptr(unsafe.Pointer(overlapped)), uintptr(unsafe.Pointer(croutine)), 0) + if r1 == socket_error { + if e1 != 0 { + err = errnoErr(e1) + } else { + err = syscall.EINVAL + } + } + return err +} + // Invented structures to support what package os expects. type Rusage struct { CreationTime Filetime diff --git a/vendor/golang.org/x/sys/windows/types_windows.go b/vendor/golang.org/x/sys/windows/types_windows.go index 0229f79cf..78b714c0d 100644 --- a/vendor/golang.org/x/sys/windows/types_windows.go +++ b/vendor/golang.org/x/sys/windows/types_windows.go @@ -29,6 +29,7 @@ const ( ERROR_NOT_FOUND syscall.Errno = 1168 ERROR_PRIVILEGE_NOT_HELD syscall.Errno = 1314 WSAEACCES syscall.Errno = 10013 + WSAEMSGSIZE syscall.Errno = 10040 WSAECONNRESET syscall.Errno = 10054 ) @@ -567,6 +568,16 @@ const ( IPV6_JOIN_GROUP = 0xc IPV6_LEAVE_GROUP = 0xd + MSG_OOB = 0x1 + MSG_PEEK = 0x2 + MSG_DONTROUTE = 0x4 + MSG_WAITALL = 0x8 + + MSG_TRUNC = 0x0100 + MSG_CTRUNC = 0x0200 + MSG_BCAST = 0x0400 + MSG_MCAST = 0x0800 + SOMAXCONN = 0x7fffffff TCP_NODELAY = 1 @@ -584,6 +595,15 @@ type WSABuf struct { Buf *byte } +type WSAMsg struct { + Name *syscall.RawSockaddrAny + Namelen int32 + Buffers *WSABuf + BufferCount uint32 + Control WSABuf + Flags uint32 +} + // Invented values to support what package os expects. const ( S_IFMT = 0x1f000 @@ -1011,6 +1031,20 @@ var WSAID_CONNECTEX = GUID{ [8]byte{0x8e, 0xe9, 0x76, 0xe5, 0x8c, 0x74, 0x06, 0x3e}, } +var WSAID_WSASENDMSG = GUID{ + 0xa441e712, + 0x754f, + 0x43ca, + [8]byte{0x84, 0xa7, 0x0d, 0xee, 0x44, 0xcf, 0x60, 0x6d}, +} + +var WSAID_WSARECVMSG = GUID{ + 0xf689d7c8, + 0x6f1f, + 0x436b, + [8]byte{0x8a, 0x53, 0xe5, 0x4f, 0xe3, 0x51, 0xc3, 0x22}, +} + const ( FILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 1 FILE_SKIP_SET_EVENT_ON_HANDLE = 2 |